feat: initialize events app with calendars, slots, tickets, and internal API
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled

Extract events/calendar functionality into standalone microservice:
- app.py and events_api.py from apps/events/
- Calendar blueprints (calendars, calendar, calendar_entries, calendar_entry, day, slots, slot, ticket_types, ticket_type)
- Templates for all calendar/event views including admin
- Dockerfile (APP_MODULE=app:app, IMAGE=events)
- entrypoint.sh (no Alembic - migrations managed by blog app)
- Gitea CI workflow for build and deploy

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-09 23:16:32 +00:00
commit 3c0fa45f8c
119 changed files with 7163 additions and 0 deletions

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from quart import (
request, render_template, make_response, Blueprint, g
)
from suma_browser.app.authz import require_admin
from suma_browser.app.redis_cacher import clear_cache
def register():
bp = Blueprint("admin", __name__, url_prefix='/admin')
# ---------- Pages ----------
@bp.get("/")
@require_admin
async def admin(slug: str, calendar_slug: str):
from suma_browser.app.utils.htmx import is_htmx_request
# Determine which template to use based on request type
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/calendar/admin/index.html")
else:
# HTMX request: main panel + OOB elements
html = await render_template("_types/calendar/admin/_oob_elements.html")
return await make_response(html)
@bp.get("/description/")
@require_admin
async def calendar_description_edit(slug: str, calendar_slug: str):
# g.post and g.calendar should already be set by the parent calendar bp
html = await render_template(
"_types/calendar/admin/_description_edit.html",
post=g.post_data['post'],
calendar=g.calendar,
)
return await make_response(html)
@bp.post("/description/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def calendar_description_save(slug: str, calendar_slug: str):
form = await request.form
description = (form.get("description") or "").strip() or None
# simple inline update, or call a service if you prefer
g.calendar.description = description
await g.s.flush()
html = await render_template(
"_types/calendar/admin/_description.html",
post=g.post_data['post'],
calendar=g.calendar,
oob=True
)
return await make_response(html)
@bp.get("/description/view/")
@require_admin
async def calendar_description_view(slug: str, calendar_slug: str):
# just render the display version without touching the DB (used by Cancel)
html = await render_template(
"_types/calendar/admin/_description.html",
post=g.post_data['post'],
calendar=g.calendar,
)
return await make_response(html)
return bp

251
bp/calendar/routes.py Normal file
View File

@@ -0,0 +1,251 @@
from __future__ import annotations
from datetime import datetime, timezone
from quart import (
request, render_template, make_response, Blueprint, g, abort, session as qsession
)
from sqlalchemy import select
from models.calendars import Calendar
from sqlalchemy.orm import selectinload, with_loader_criteria
from suma_browser.app.authz import require_admin
from .admin.routes import register as register_admin
from .services import get_visible_entries_for_period
from .services.calendar_view import (
parse_int_arg,
add_months,
build_calendar_weeks,
get_calendar_by_post_and_slug,
get_calendar_by_slug,
update_calendar_description,
)
from suma_browser.app.utils.htmx import is_htmx_request
from ..slots.routes import register as register_slots
from models.calendars import CalendarSlot
from suma_browser.app.bp.calendars.services.calendars import soft_delete
from suma_browser.app.bp.day.routes import register as register_day
from suma_browser.app.redis_cacher import cache_page, clear_cache
from sqlalchemy import select
import calendar as pycalendar
def register():
bp = Blueprint("calendar", __name__, url_prefix='/<calendar_slug>')
bp.register_blueprint(
register_admin(),
)
bp.register_blueprint(
register_slots(),
)
bp.register_blueprint(
register_day()
)
@bp.url_value_preprocessor
def pull(endpoint, values):
g.calendar_slug = values.get("calendar_slug")
@bp.before_request
async def hydrate_calendar_data():
calendar_slug = getattr(g, "calendar_slug", None)
# Standalone mode (events app): no post context
post_data = getattr(g, "post_data", None)
if post_data:
post_id = (post_data.get("post") or {}).get("id")
cal = await get_calendar_by_post_and_slug(g.s, post_id, calendar_slug)
else:
cal = await get_calendar_by_slug(g.s, calendar_slug)
if not cal:
abort(404)
return
g.calendar = cal
@bp.context_processor
async def inject_root():
return {
"calendar": getattr(g, "calendar", None),
}
# ---------- Pages ----------
# ---------- Pages ----------
@bp.get("/")
@cache_page(tag="calendars")
async def get(calendar_slug: str, **kwargs):
"""
Show a month-view calendar for this calendar.
- One month at a time
- Outer arrows: +/- 1 year
- Inner arrows: +/- 1 month
"""
# --- Determine year & month from query params ---
today = datetime.now(timezone.utc).date()
month = parse_int_arg("month")
year = parse_int_arg("year")
if year is None:
year = today.year
if month is None or not (1 <= month <= 12):
month = today.month
# --- Helpers to move between months ---
prev_month_year, prev_month = add_months(year, month, -1)
next_month_year, next_month = add_months(year, month, +1)
prev_year = year - 1
next_year = year + 1
# --- Build weeks grid (list of weeks, each week = 7 days) ---
weeks = build_calendar_weeks(year, month)
month_name = pycalendar.month_name[month]
weekday_names = [pycalendar.day_abbr[i] for i in range(7)]
# --- Period boundaries for this calendar view ---
period_start = datetime(year, month, 1, tzinfo=timezone.utc)
next_y, next_m = add_months(year, month, +1)
period_end = datetime(next_y, next_m, 1, tzinfo=timezone.utc)
# --- Identity & admin flag ---
user = getattr(g, "user", None)
session_id = qsession.get("calendar_sid")
visible = await get_visible_entries_for_period(
sess=g.s,
calendar_id=g.calendar.id,
period_start=period_start,
period_end=period_end,
user=user,
session_id=session_id,
)
month_entries = visible.merged_entries
user_entries = visible.user_entries
confirmed_entries = visible.confirmed_entries
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template(
"_types/calendar/index.html",
qsession=qsession,
year=year,
month=month,
month_name=month_name,
weekday_names=weekday_names,
weeks=weeks,
prev_month=prev_month,
prev_month_year=prev_month_year,
next_month=next_month,
next_month_year=next_month_year,
prev_year=prev_year,
next_year=next_year,
user_entries=user_entries,
confirmed_entries=confirmed_entries,
month_entries=month_entries,
)
else:
html = await render_template(
"_types/calendar/_oob_elements.html",
qsession=qsession,
year=year,
month=month,
month_name=month_name,
weekday_names=weekday_names,
weeks=weeks,
prev_month=prev_month,
prev_month_year=prev_month_year,
next_month=next_month,
next_month_year=next_month_year,
prev_year=prev_year,
next_year=next_year,
user_entries=user_entries,
confirmed_entries=confirmed_entries,
month_entries=month_entries,
)
return await make_response(html)
@bp.put("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def put(calendar_slug: str, **kwargs):
"""
Idempotent update for calendar configuration.
Accepts HTMX form (POST/PUT) and optional JSON.
"""
# Try JSON first
data = await request.get_json(silent=True)
description = None
if data and isinstance(data, dict):
description = (data.get("description") or "").strip()
else:
form = await request.form
description = (form.get("description") or "").strip()
await update_calendar_description(g.calendar, description)
html = await render_template("_types/calendar/admin/index.html")
return await make_response(html, 200)
@bp.delete("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def delete(calendar_slug: str, **kwargs):
from suma_browser.app.utils.htmx import is_htmx_request
cal = g.calendar
cal.deleted_at = datetime.now(timezone.utc)
await g.s.flush()
# If we have post context (blog-embedded mode), update nav
post_data = getattr(g, "post_data", None)
html = await render_template("_types/calendars/index.html")
if post_data:
from ..post.services.entry_associations import get_associated_entries
post_id = (post_data.get("post") or {}).get("id")
cals = (
await g.s.execute(
select(Calendar)
.where(Calendar.post_id == post_id, Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
).scalars().all()
associated_entries = await get_associated_entries(g.s, post_id)
nav_oob = await render_template(
"_types/post/admin/_nav_entries_oob.html",
associated_entries=associated_entries,
calendars=cals,
post=post_data["post"],
)
html = html + nav_oob
return await make_response(html, 200)
return bp

View File

@@ -0,0 +1 @@
from .visiblity import get_visible_entries_for_period

View File

@@ -0,0 +1,24 @@
from sqlalchemy import select, update
from models.calendars import CalendarEntry
from sqlalchemy import func
async def adopt_session_entries_for_user(session, user_id: int, session_id: str | None) -> None:
if not session_id:
return
# (Optional) Mark any existing entries for this user as deleted to avoid duplicates
await session.execute(
update(CalendarEntry)
.where(CalendarEntry.deleted_at.is_(None), CalendarEntry.user_id == user_id)
.values(deleted_at=func.now())
)
# Reassign anonymous entries to the user
result = await session.execute(
select(CalendarEntry).where(
CalendarEntry.deleted_at.is_(None),
CalendarEntry.session_id == session_id
)
)
anon_entries = result.scalars().all()
for entry in anon_entries:
entry.user_id = user_id
# No commit here; caller will commit

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from models.calendars import Calendar
from ...calendars.services.calendars import CalendarError
async def update_calendar_config(sess, calendar_id: int, *, description: str | None, slots: list | None):
"""Update description and slots for a calendar."""
cal = await sess.get(Calendar, calendar_id)
if not cal:
raise CalendarError(f"Calendar {calendar_id} not found.")
cal.description = (description or '').strip() or None
# Validate slots shape a bit
norm_slots: list[dict] = []
if slots:
for s in slots:
if not isinstance(s, dict):
continue
norm_slots.append({
"days": str(s.get("days", ""))[:7].lower(),
"time_from": str(s.get("time_from", ""))[:5],
"time_to": str(s.get("time_to", ""))[:5],
"cost_name": (s.get("cost_name") or "")[:64],
"description": (s.get("description") or "")[:255],
})
cal.slots = norm_slots or None
await sess.flush()
return cal

View File

@@ -0,0 +1,109 @@
from __future__ import annotations
from datetime import datetime, timezone
from typing import Optional
import calendar as pycalendar
from quart import request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload, with_loader_criteria
from models.calendars import Calendar, CalendarSlot
def parse_int_arg(name: str, default: Optional[int] = None) -> Optional[int]:
"""Parse an integer query parameter from the request."""
val = request.args.get(name, "").strip()
if not val:
return default
try:
return int(val)
except ValueError:
return default
def add_months(year: int, month: int, delta: int) -> tuple[int, int]:
"""Add (or subtract) months to a given year/month, handling year overflow."""
new_month = month + delta
new_year = year + (new_month - 1) // 12
new_month = ((new_month - 1) % 12) + 1
return new_year, new_month
def build_calendar_weeks(year: int, month: int) -> list[list[dict]]:
"""
Build a calendar grid for the given year and month.
Returns a list of weeks, where each week is a list of 7 day dictionaries.
"""
today = datetime.now(timezone.utc).date()
cal = pycalendar.Calendar(firstweekday=0) # 0 = Monday
weeks: list[list[dict]] = []
for week in cal.monthdatescalendar(year, month):
week_days = []
for d in week:
week_days.append(
{
"date": d,
"in_month": (d.month == month),
"is_today": (d == today),
}
)
weeks.append(week_days)
return weeks
async def get_calendar_by_post_and_slug(
session: AsyncSession,
post_id: int,
calendar_slug: str,
) -> Optional[Calendar]:
"""
Fetch a calendar by post_id and slug, with slots eagerly loaded.
Returns None if not found.
"""
result = await session.execute(
select(Calendar)
.options(
selectinload(Calendar.slots),
with_loader_criteria(CalendarSlot, CalendarSlot.deleted_at.is_(None)),
)
.where(
Calendar.post_id == post_id,
Calendar.slug == calendar_slug,
Calendar.deleted_at.is_(None),
)
)
return result.scalar_one_or_none()
async def get_calendar_by_slug(
session: AsyncSession,
calendar_slug: str,
) -> Optional[Calendar]:
"""
Fetch a calendar by slug only (for standalone events service).
With slots eagerly loaded. Returns None if not found.
"""
result = await session.execute(
select(Calendar)
.options(
selectinload(Calendar.slots),
with_loader_criteria(CalendarSlot, CalendarSlot.deleted_at.is_(None)),
)
.where(
Calendar.slug == calendar_slug,
Calendar.deleted_at.is_(None),
)
)
return result.scalar_one_or_none()
async def update_calendar_description(
calendar: Calendar,
description: Optional[str],
) -> None:
"""Update calendar description (in-place on the calendar object)."""
calendar.description = description or None

View File

@@ -0,0 +1,117 @@
from __future__ import annotations
from datetime import time
from typing import Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import CalendarSlot
class SlotError(ValueError):
pass
def _b(v):
if isinstance(v, bool):
return v
s = str(v).lower()
return s in {"1","true","t","yes","y","on"}
async def list_slots(sess: AsyncSession, calendar_id: int) -> Sequence[CalendarSlot]:
res = await sess.execute(
select(CalendarSlot)
.where(CalendarSlot.calendar_id == calendar_id, CalendarSlot.deleted_at.is_(None))
.order_by(CalendarSlot.time_start.asc(), CalendarSlot.id.asc())
)
return res.scalars().all()
async def create_slot(sess: AsyncSession, calendar_id: int, *, name: str, description: str | None,
days: dict, time_start: time, time_end: time, cost: float | None):
if not name:
raise SlotError("name is required")
if not time_start or not time_end or time_end <= time_start:
raise SlotError("time range invalid")
slot = CalendarSlot(
calendar_id=calendar_id,
name=name,
description=(description or None),
mon=_b(days.get("mon")), tue=_b(days.get("tue")), wed=_b(days.get("wed")),
thu=_b(days.get("thu")), fri=_b(days.get("fri")), sat=_b(days.get("sat")), sun=_b(days.get("sun")),
time_start=time_start, time_end=time_end, cost=cost,
)
sess.add(slot)
await sess.flush()
return slot
async def update_slot(
sess: AsyncSession,
slot_id: int,
*,
name: str | None = None,
description: str | None = None,
days: dict | None = None,
time_start: time | None = None,
time_end: time | None = None,
cost: float | None = None,
flexible: bool | None = None, # NEW
):
slot = await sess.get(CalendarSlot, slot_id)
if not slot or slot.deleted_at is not None:
raise SlotError("slot not found")
if name is not None:
slot.name = name
if description is not None:
slot.description = description or None
if days is not None:
slot.mon = _b(days.get("mon", slot.mon))
slot.tue = _b(days.get("tue", slot.tue))
slot.wed = _b(days.get("wed", slot.wed))
slot.thu = _b(days.get("thu", slot.thu))
slot.fri = _b(days.get("fri", slot.fri))
slot.sat = _b(days.get("sat", slot.sat))
slot.sun = _b(days.get("sun", slot.sun))
if time_start is not None:
slot.time_start = time_start
if time_end is not None:
slot.time_end = time_end
if (time_start or time_end) and slot.time_end <= slot.time_start:
raise SlotError("time range invalid")
if cost is not None:
slot.cost = cost
# NEW: update flexible flag only if explicitly provided
if flexible is not None:
slot.flexible = flexible
await sess.flush()
return slot
async def soft_delete_slot(sess: AsyncSession, slot_id: int):
slot = await sess.get(CalendarSlot, slot_id)
if not slot or slot.deleted_at is not None:
return
from datetime import datetime, timezone
slot.deleted_at = datetime.now(timezone.utc)
await sess.flush()
async def get_slot(sess: AsyncSession, slot_id: int) -> CalendarSlot | None:
return await sess.get(CalendarSlot, slot_id)
async def update_slot_description(
sess: AsyncSession,
slot_id: int,
description: str | None,
) -> CalendarSlot:
slot = await sess.get(CalendarSlot, slot_id)
if not slot:
raise SlotError("slot not found")
slot.description = description or None
await sess.flush()
return slot

View File

@@ -0,0 +1,115 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import CalendarEntry
@dataclass
class VisibleEntries:
"""
Result of applying calendar visibility rules for a given period.
"""
user_entries: list[CalendarEntry]
confirmed_entries: list[CalendarEntry]
admin_other_entries: list[CalendarEntry]
merged_entries: list[CalendarEntry] # sorted, deduped
async def get_visible_entries_for_period(
sess: AsyncSession,
calendar_id: int,
period_start: datetime,
period_end: datetime,
user: Optional[object],
session_id: Optional[str],
) -> VisibleEntries:
"""
Visibility rules (same as your fixed month view):
- Non-admin:
- sees all *confirmed* entries in the period (any user)
- sees all entries for current user/session in the period (any state)
- Admin:
- sees all confirmed + provisional + ordered entries in the period (all users)
- sees pending only for current user/session
"""
user_id = user.id if user else None
is_admin = bool(user and getattr(user, "is_admin", False))
# --- Entries for current user/session (any state, in period) ---
user_entries: list[CalendarEntry] = []
if user_id or session_id:
conditions_user = [
CalendarEntry.calendar_id == calendar_id,
CalendarEntry.deleted_at.is_(None),
CalendarEntry.start_at >= period_start,
CalendarEntry.start_at < period_end,
]
if user_id:
conditions_user.append(CalendarEntry.user_id == user_id)
elif session_id:
conditions_user.append(CalendarEntry.session_id == session_id)
result_user = await sess.execute(select(CalendarEntry).where(*conditions_user))
user_entries = result_user.scalars().all()
# --- Confirmed entries for everyone in period ---
result_conf = await sess.execute(
select(CalendarEntry).where(
CalendarEntry.calendar_id == calendar_id,
CalendarEntry.state == "confirmed",
CalendarEntry.deleted_at.is_(None),
CalendarEntry.start_at >= period_start,
CalendarEntry.start_at < period_end,
)
)
confirmed_entries = result_conf.scalars().all()
# --- For admins: ordered + provisional for everyone in period ---
admin_other_entries: list[CalendarEntry] = []
if is_admin:
result_admin = await sess.execute(
select(CalendarEntry).where(
CalendarEntry.calendar_id == calendar_id,
CalendarEntry.state.in_(("ordered", "provisional")),
CalendarEntry.deleted_at.is_(None),
CalendarEntry.start_at >= period_start,
CalendarEntry.start_at < period_end,
)
)
admin_other_entries = result_admin.scalars().all()
# --- Merge with de-duplication and keep chronological order ---
entries_by_id: dict[int, CalendarEntry] = {}
# Everyone's confirmed
for e in confirmed_entries:
entries_by_id[e.id] = e
# Admin-only: everyone's ordered/provisional
if is_admin:
for e in admin_other_entries:
entries_by_id[e.id] = e
# Always include current user/session entries (includes their pending)
for e in user_entries:
entries_by_id[e.id] = e
merged_entries = sorted(
entries_by_id.values(),
key=lambda e: e.start_at or period_start,
)
return VisibleEntries(
user_entries=user_entries,
confirmed_entries=confirmed_entries,
admin_other_entries=admin_other_entries,
merged_entries=merged_entries,
)

View File

@@ -0,0 +1,226 @@
from __future__ import annotations
from datetime import datetime, timezone
from decimal import Decimal
from quart import (
request, render_template, make_response, Blueprint, g, redirect, url_for, jsonify
)
from sqlalchemy import update
from models.calendars import CalendarEntry
from .services.entries import (
add_entry as svc_add_entry,
)
from suma_browser.app.authz import require_admin
from suma_browser.app.redis_cacher import clear_cache
from suma_browser.app.bp.calendar_entry.routes import register as register_calendar_entry
from models.calendars import CalendarSlot
from sqlalchemy import select
def calculate_entry_cost(slot: CalendarSlot, start_at: datetime, end_at: datetime) -> Decimal:
"""
Calculate cost for an entry based on slot and time range.
- Fixed slot: use slot cost
- Flexible slot: prorate based on actual time vs slot time range
"""
if not slot.cost:
return Decimal('0')
if not slot.flexible:
# Fixed slot: full cost
return Decimal(str(slot.cost))
# Flexible slot: calculate ratio
if not slot.time_end or not start_at or not end_at:
return Decimal('0')
# Calculate durations in minutes
slot_start_minutes = slot.time_start.hour * 60 + slot.time_start.minute
slot_end_minutes = slot.time_end.hour * 60 + slot.time_end.minute
slot_duration = slot_end_minutes - slot_start_minutes
actual_start_minutes = start_at.hour * 60 + start_at.minute
actual_end_minutes = end_at.hour * 60 + end_at.minute
actual_duration = actual_end_minutes - actual_start_minutes
if slot_duration <= 0 or actual_duration <= 0:
return Decimal('0')
ratio = Decimal(actual_duration) / Decimal(slot_duration)
return Decimal(str(slot.cost)) * ratio
def register():
bp = Blueprint("calendar_entries", __name__, url_prefix='/entries')
bp.register_blueprint(
register_calendar_entry()
)
@bp.post("/")
@clear_cache(tag="calendars", tag_scope="all")
async def add_entry(year: int, month: int, day: int, **kwargs):
form = await request.form
def parse_time_to_dt(value: str | None, year: int, month: int, day: int):
if not value:
return None
try:
hour_str, minute_str = value.split(":", 1)
hour = int(hour_str)
minute = int(minute_str)
return datetime(year, month, day, hour, minute, tzinfo=timezone.utc)
except Exception:
return None
name = (form.get("name") or "").strip()
start_at = parse_time_to_dt(form.get("start_time"), year, month, day)
end_at = parse_time_to_dt(form.get("end_time"), year, month, day)
# NEW: slot_id
slot_id_raw = (form.get("slot_id") or "").strip()
slot_id = int(slot_id_raw) if slot_id_raw else None
# Ticket configuration
ticket_price_str = (form.get("ticket_price") or "").strip()
ticket_price = None
if ticket_price_str:
try:
ticket_price = Decimal(ticket_price_str)
except Exception:
pass
ticket_count_str = (form.get("ticket_count") or "").strip()
ticket_count = None
if ticket_count_str:
try:
ticket_count = int(ticket_count_str)
except Exception:
pass
field_errors: dict[str, list[str]] = {}
# Basic checks
if not name:
field_errors.setdefault("name", []).append("Please enter a name for the entry.")
# Check slot first before validating times
slot = None
cost = Decimal('10') # default cost
if slot_id is not None:
result = await g.s.execute(
select(CalendarSlot).where(
CalendarSlot.id == slot_id,
CalendarSlot.calendar_id == g.calendar.id,
CalendarSlot.deleted_at.is_(None),
)
)
slot = result.scalar_one_or_none()
if slot is None:
field_errors.setdefault("slot_id", []).append(
"Selected slot is no longer available."
)
else:
# For inflexible slots, override the times with slot times
if not slot.flexible:
# Replace start/end with slot times
start_at = datetime(year, month, day,
slot.time_start.hour,
slot.time_start.minute,
tzinfo=timezone.utc)
if slot.time_end:
end_at = datetime(year, month, day,
slot.time_end.hour,
slot.time_end.minute,
tzinfo=timezone.utc)
else:
# Flexible: validate times are within slot band
# Only validate if times were provided
if not start_at:
field_errors.setdefault("start_time", []).append("Please select a start time.")
if end_at is None:
field_errors.setdefault("end_time", []).append("Please select an end time.")
if start_at and end_at:
s_time = start_at.timetz()
e_time = end_at.timetz()
slot_start = slot.time_start
slot_end = slot.time_end
if s_time.replace(tzinfo=None) < slot_start:
field_errors.setdefault("start_time", []).append(
f"Start time must be at or after {slot_start.strftime('%H:%M')}."
)
if slot_end is not None and e_time.replace(tzinfo=None) > slot_end:
field_errors.setdefault("end_time", []).append(
f"End time must be at or before {slot_end.strftime('%H:%M')}."
)
# Calculate cost based on slot and times
if start_at and end_at:
cost = calculate_entry_cost(slot, start_at, end_at)
else:
field_errors.setdefault("slot_id", []).append(
"Please select a slot."
)
# Time ordering check (only if we have times)
if start_at and end_at and end_at < start_at:
field_errors.setdefault("end_time", []).append("End time must be after the start time.")
if field_errors:
return jsonify(
{
"message": "Please fix the highlighted fields.",
"errors": field_errors,
}
), 422
# Pass slot_id and calculated cost to the service
entry = await svc_add_entry(
g.s,
calendar_id=g.calendar.id,
name=name,
start_at=start_at,
end_at=end_at,
user_id=getattr(g, "user", None).id if getattr(g, "user", None) else None,
session_id=None,
slot_id=slot_id,
cost=cost, # Pass calculated cost
)
# Set ticket configuration
entry.ticket_price = ticket_price
entry.ticket_count = ticket_count
html = await render_template("_types/day/_main_panel.html")
return await make_response(html, 200)
@bp.get("/add/")
async def add_form(day: int, month: int, year: int, **kwargs):
html = await render_template(
"_types/day/_add.html",
)
return await make_response(html)
@bp.get("/add-button/")
async def add_button(day: int, month: int, year: int, **kwargs):
html = await render_template(
"_types/day/_add_button.html",
)
return await make_response(html)
return bp

View File

@@ -0,0 +1,258 @@
from __future__ import annotations
from datetime import datetime
from typing import Optional, Sequence
from decimal import Decimal
from sqlalchemy import select, and_, or_
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import Calendar, CalendarEntry
from datetime import datetime
from suma_browser.app.errors import AppError
class CalendarError(AppError):
"""Base error for calendar service operations."""
status_code = 422
async def add_entry(
sess: AsyncSession,
calendar_id: int,
name: str,
start_at: Optional[datetime],
end_at: Optional[datetime],
user_id: int | None = None,
session_id: str | None = None,
slot_id: int | None = None, # NEW: accept slot_id
cost: Optional[Decimal] = None, # NEW: accept cost
) -> CalendarEntry:
"""
Add an entry to a calendar.
Collects *all* validation errors and raises CalendarError([...])
so the HTMX handler can show them as a list.
"""
errors: list[str] = []
# Normalise
name = (name or "").strip()
# Name validation
if not name:
errors.append("Entry name must not be empty.")
# start_at validation
if start_at is None:
errors.append("Start time is required.")
elif not isinstance(start_at, datetime):
errors.append("Start time is invalid.")
# end_at validation
if end_at is not None and not isinstance(end_at, datetime):
errors.append("End time is invalid.")
# Time ordering (only if we have sensible datetimes)
if isinstance(start_at, datetime) and isinstance(end_at, datetime):
if end_at < start_at:
errors.append("End time must be greater than or equal to the start time.")
# If we have any validation errors, bail out now
if errors:
raise CalendarError(errors, status_code=422)
# Calendar existence (this is more of a 404 than a validation issue)
cal = (
await sess.execute(
select(Calendar).where(
Calendar.id == calendar_id,
Calendar.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if not cal:
# Single-message CalendarError still handled by the same error handler
raise CalendarError(
f"Calendar {calendar_id} does not exist or has been deleted.",
status_code=404,
)
# All good, create the entry
entry = CalendarEntry(
calendar_id=calendar_id,
name=name,
start_at=start_at,
end_at=end_at,
user_id=user_id,
session_id=session_id,
slot_id=slot_id, # NEW: save slot_id
state="pending",
cost=cost if cost is not None else Decimal('10'), # Use provided cost or default
)
sess.add(entry)
await sess.flush()
return entry
async def list_entries(
sess: AsyncSession,
post_id: int,
calendar_slug: str,
from_: Optional[datetime] = None,
to: Optional[datetime] = None,
) -> Sequence[CalendarEntry]:
"""
List entries for a given post's calendar by name.
- Respects soft-deletes (only non-deleted calendar / entries).
- If a time window is provided, returns entries that overlap the window:
- If only from_ is given: entries where end_at is NULL or end_at >= from_
- If only to is given: entries where start_at <= to
- If both given: entries where [start_at, end_at or +inf] overlaps [from_, to]
- Sorted by start_at ascending.
"""
calendar_slug = (calendar_slug or "").strip()
if not calendar_slug:
raise CalendarError("calendar_slug must not be empty.")
cal = (
await sess.execute(
select(Calendar.id)
.where(
Calendar.post_id == post_id,
Calendar.slug == calendar_slug,
Calendar.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if not cal:
# Return empty list instead of raising, so callers can treat absence as "no entries"
return []
# Base filter: not soft-deleted entries of this calendar
filters = [CalendarEntry.calendar_id == cal, CalendarEntry.deleted_at.is_(None)]
# Time window logic
if from_ and to:
# Overlap condition: start <= to AND (end is NULL OR end >= from_)
filters.append(CalendarEntry.start_at <= to)
filters.append(or_(CalendarEntry.end_at.is_(None), CalendarEntry.end_at >= from_))
elif from_:
# Anything that hasn't ended before from_
filters.append(or_(CalendarEntry.end_at.is_(None), CalendarEntry.end_at >= from_))
elif to:
# Anything that has started by 'to'
filters.append(CalendarEntry.start_at <= to)
stmt = (
select(CalendarEntry)
.where(and_(*filters))
.order_by(CalendarEntry.start_at.asc(), CalendarEntry.id.asc())
)
result = await sess.execute(stmt)
entries = list(result.scalars())
# Eagerly load slot relationships
for entry in entries:
await sess.refresh(entry, ['slot'])
return entries
async def svc_update_entry(
sess: AsyncSession,
entry_id: int,
*,
name: str | None = None,
start_at: datetime | None = None,
end_at: datetime | None = None,
user_id: int | None = None,
session_id: str | None = None,
slot_id: int | None = None, # NEW: accept slot_id
cost: Decimal | None = None, # NEW: accept cost
) -> CalendarEntry:
"""
Update an existing CalendarEntry.
- Performs the same validations as add_entry()
- Returns the updated CalendarEntry
- Raises CalendarError([...]) on validation issues
- Raises CalendarError(...) if entry does not exist
"""
# Fetch entry
entry = (
await sess.execute(
select(CalendarEntry).where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if not entry:
raise CalendarError(
f"Entry {entry_id} does not exist or has been deleted.",
status_code=404,
)
errors: list[str] = []
# ----- Validation ----- #
# Name validation only if updating it
if name is not None:
name = name.strip()
if not name:
errors.append("Entry name must not be empty.")
# start_at type validation only if provided
if start_at is not None and not isinstance(start_at, datetime):
errors.append("Start time is invalid.")
# end_at type validation
if end_at is not None and not isinstance(end_at, datetime):
errors.append("End time is invalid.")
# Time ordering
effective_start = start_at if start_at is not None else entry.start_at
effective_end = end_at if end_at is not None else entry.end_at
if isinstance(effective_start, datetime) and isinstance(effective_end, datetime):
if effective_end < effective_start:
errors.append("End time must be greater than or equal to the start time.")
# Validation failures?
if errors:
raise CalendarError(errors, status_code=422)
# ----- Apply Updates ----- #
if name is not None:
entry.name = name
if start_at is not None:
entry.start_at = start_at
if end_at is not None:
entry.end_at = end_at
if user_id is not None:
entry.user_id = user_id
if session_id is not None:
entry.session_id = session_id
if slot_id is not None: # NEW: update slot_id
entry.slot_id = slot_id
if cost is not None: # NEW: update cost
entry.cost = cost
entry.updated_at = datetime.utcnow()
await sess.flush()
return entry

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from quart import (
render_template, make_response, Blueprint
)
from suma_browser.app.authz import require_admin
def register():
bp = Blueprint("admin", __name__, url_prefix='/admin')
# ---------- Pages ----------
@bp.get("/")
@require_admin
async def admin(entry_id: int, **kwargs):
from suma_browser.app.utils.htmx import is_htmx_request
# Determine which template to use based on request type
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/entry/admin/index.html")
else:
html = await render_template("_types/entry/admin/_oob_elements.html")
return await make_response(html)
return bp

574
bp/calendar_entry/routes.py Normal file
View File

@@ -0,0 +1,574 @@
from __future__ import annotations
from sqlalchemy import select, update
from models.calendars import CalendarEntry, CalendarSlot
from suma_browser.app.authz import require_admin
from suma_browser.app.redis_cacher import clear_cache
from sqlalchemy import select
from quart import (
request, render_template, make_response, Blueprint, g, jsonify
)
from ..calendar_entries.services.entries import (
svc_update_entry,
CalendarError, # <-- add this if you want to catch it explicitly
)
from .services.post_associations import (
add_post_to_entry,
remove_post_from_entry,
get_entry_posts,
search_posts as svc_search_posts,
)
from datetime import datetime, timezone
import math
import logging
from ..ticket_types.routes import register as register_ticket_types
from .admin.routes import register as register_admin
logger = logging.getLogger(__name__)
def register():
bp = Blueprint("calendar_entry", __name__, url_prefix='/<int:entry_id>')
# Register tickets blueprint
bp.register_blueprint(
register_ticket_types()
)
bp.register_blueprint(
register_admin()
)
@bp.before_request
async def load_entry():
"""Load the calendar entry from the URL parameter."""
entry_id = request.view_args.get("entry_id")
if entry_id:
result = await g.s.execute(
select(CalendarEntry)
.where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None)
)
)
g.entry = result.scalar_one_or_none()
@bp.context_processor
async def inject_entry():
"""Make entry and date parameters available to all templates in this blueprint."""
return {
"entry": getattr(g, "entry", None),
"year": request.view_args.get("year"),
"month": request.view_args.get("month"),
"day": request.view_args.get("day"),
}
async def get_day_nav_oob(year: int, month: int, day: int):
"""Helper to generate OOB update for day entries nav"""
from datetime import datetime, timezone, date, timedelta
from ..calendar.services import get_visible_entries_for_period
from quart import session as qsession
# Get the calendar from g
calendar = getattr(g, "calendar", None)
if not calendar:
return ""
# Build day date
try:
day_date = date(year, month, day)
except (ValueError, TypeError):
return ""
# Period: this day only
period_start = datetime(year, month, day, tzinfo=timezone.utc)
period_end = period_start + timedelta(days=1)
# Identity
user = getattr(g, "user", None)
session_id = qsession.get("calendar_sid")
# Get confirmed entries for this day
visible = await get_visible_entries_for_period(
sess=g.s,
calendar_id=calendar.id,
period_start=period_start,
period_end=period_end,
user=user,
session_id=session_id,
)
# Render OOB template
nav_oob = await render_template(
"_types/day/admin/_nav_entries_oob.html",
confirmed_entries=visible.confirmed_entries,
post=g.post_data["post"],
calendar=calendar,
day_date=day_date,
)
return nav_oob
async def get_post_nav_oob(entry_id: int):
"""Helper to generate OOB update for post entries nav when entry state changes"""
# Get the entry to find associated posts
entry = await g.s.scalar(
select(CalendarEntry).where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None)
)
)
if not entry:
return ""
# Get all posts associated with this entry
from .services.post_associations import get_entry_posts
entry_posts = await get_entry_posts(g.s, entry_id)
# Generate OOB updates for each post's nav
nav_oobs = []
for post in entry_posts:
# Get associated entries for this post
from ..post.services.entry_associations import get_associated_entries
associated_entries = await get_associated_entries(g.s, post.id)
# Load calendars for this post
from models.calendars import Calendar
calendars = (
await g.s.execute(
select(Calendar)
.where(Calendar.post_id == post.id, Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
).scalars().all()
# Render OOB template for this post's nav
nav_oob = await render_template(
"_types/post/admin/_nav_entries_oob.html",
associated_entries=associated_entries,
calendars=calendars,
post=post,
)
nav_oobs.append(nav_oob)
return "".join(nav_oobs)
@bp.context_processor
async def inject_root():
view_args = getattr(request, "view_args", {}) or {}
entry_id = view_args.get("entry_id")
calendar_entry = None
entry_posts = []
stmt = (
select(CalendarEntry)
.where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None),
)
)
result = await g.s.execute(stmt)
calendar_entry = result.scalar_one_or_none()
# Optional: also ensure it belongs to the current calendar, if g.calendar is set
if calendar_entry is not None and getattr(g, "calendar", None):
if calendar_entry.calendar_id != g.calendar.id:
calendar_entry = None
# Refresh slot relationship if we have a valid entry
if calendar_entry is not None:
await g.s.refresh(calendar_entry, ['slot'])
# Fetch associated posts
entry_posts = await get_entry_posts(g.s, calendar_entry.id)
return {
"entry": calendar_entry,
"entry_posts": entry_posts,
}
@bp.get("/")
@require_admin
async def get(entry_id: int, **rest):
from suma_browser.app.utils.htmx import is_htmx_request
# TODO: Create _main_panel.html and _oob_elements.html for optimized HTMX
# For now, render full template for both HTMX and normal requests
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template(
"_types/entry/index.html",
)
else:
html = await render_template(
"_types/entry/_oob_elements.html",
)
return await make_response(html, 200)
@bp.get("/edit/")
@require_admin
async def get_edit(entry_id: int, **rest):
html = await render_template("_types/entry/_edit.html")
return await make_response(html, 200)
@bp.put("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def put(year: int, month: int, day: int, entry_id: int, **rest):
form = await request.form
def parse_time_to_dt(value: str | None, year: int, month: int, day: int):
"""
'HH:MM' + (year, month, day) -> aware datetime in UTC.
Returns None if empty/invalid.
"""
if not value:
return None
try:
hour_str, minute_str = value.split(":", 1)
hour = int(hour_str)
minute = int(minute_str)
return datetime(year, month, day, hour, minute, tzinfo=timezone.utc)
except Exception:
return None
name = (form.get("name") or "").strip()
start_at = parse_time_to_dt(form.get("start_at"), year, month, day)
end_at = parse_time_to_dt(form.get("end_at"), year, month, day)
# NEW: slot_id
slot_id_raw = (form.get("slot_id") or "").strip()
slot_id = int(slot_id_raw) if slot_id_raw else None
# Ticket configuration
ticket_price_str = (form.get("ticket_price") or "").strip()
ticket_price = None
if ticket_price_str:
try:
from decimal import Decimal
ticket_price = Decimal(ticket_price_str)
except Exception:
pass # Will be validated below if needed
ticket_count_str = (form.get("ticket_count") or "").strip()
ticket_count = None
if ticket_count_str:
try:
ticket_count = int(ticket_count_str)
except Exception:
pass # Will be validated below if needed
field_errors: dict[str, list[str]] = {}
# --- Basic validation (slot-style) -------------------------
if not name:
field_errors.setdefault("name", []).append(
"Please enter a name for the entry."
)
# Check slot first before validating times
slot = None
if slot_id is not None:
result = await g.s.execute(
select(CalendarSlot).where(
CalendarSlot.id == slot_id,
CalendarSlot.calendar_id == g.calendar.id,
CalendarSlot.deleted_at.is_(None),
)
)
slot = result.scalar_one_or_none()
if slot is None:
field_errors.setdefault("slot_id", []).append(
"Selected slot is no longer available."
)
else:
# For inflexible slots, override the times with slot times
if not slot.flexible:
# Replace start/end with slot times
start_at = datetime(year, month, day,
slot.time_start.hour,
slot.time_start.minute,
tzinfo=timezone.utc)
if slot.time_end:
end_at = datetime(year, month, day,
slot.time_end.hour,
slot.time_end.minute,
tzinfo=timezone.utc)
else:
# Flexible: validate times are within slot band
# Only validate if times were provided
if not start_at:
field_errors.setdefault("start_at", []).append(
"Please select a start time."
)
if not end_at:
field_errors.setdefault("end_at", []).append(
"Please select an end time."
)
if start_at and end_at:
s_time = start_at.timetz()
e_time = end_at.timetz()
slot_start = slot.time_start
slot_end = slot.time_end
if s_time.replace(tzinfo=None) < slot_start:
field_errors.setdefault("start_at", []).append(
f"Start time must be at or after {slot_start.strftime('%H:%M')}."
)
if slot_end is not None and e_time.replace(tzinfo=None) > slot_end:
field_errors.setdefault("end_at", []).append(
f"End time must be at or before {slot_end.strftime('%H:%M')}."
)
else:
field_errors.setdefault("slot_id", []).append(
"Please select a slot."
)
# Time ordering check (only if we have times and no slot override)
if start_at and end_at and end_at < start_at:
field_errors.setdefault("end_at", []).append(
"End time must be after the start time."
)
if field_errors:
return jsonify(
{
"message": "Please fix the highlighted fields.",
"errors": field_errors,
}
), 422
# --- Service call & safety net for extra validation -------
try:
entry = await svc_update_entry(
g.s,
entry_id,
name=name,
start_at=start_at,
end_at=end_at,
slot_id=slot_id, # Pass slot_id to service
)
# Update ticket configuration
entry.ticket_price = ticket_price
entry.ticket_count = ticket_count
except CalendarError as e:
# If the service still finds something wrong, surface it nicely.
msg = str(e)
return jsonify(
{
"message": "There was a problem updating the entry.",
"errors": {"__all__": [msg]},
}
), 422
# --- Success: re-render the entry block -------------------
# Get nav OOB update
nav_oob = await get_day_nav_oob(year, month, day)
html = await render_template(
"_types/entry/index.html",
#entry=entry,
)
return await make_response(html + nav_oob, 200)
@bp.post("/confirm/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def confirm_entry(entry_id: int, year: int, month: int, day: int, **rest):
await g.s.execute(
update(CalendarEntry)
.where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "provisional",
)
.values(state="confirmed")
)
await g.s.flush()
# Get nav OOB updates (both day and post navs)
day_nav_oob = await get_day_nav_oob(year, month, day)
post_nav_oob = await get_post_nav_oob(entry_id)
# redirect back to calendar admin or order page as you prefer
html = await render_template("_types/entry/_optioned.html")
return await make_response(html + day_nav_oob + post_nav_oob, 200)
@bp.post("/decline/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def decline_entry(entry_id: int, year: int, month: int, day: int, **rest):
await g.s.execute(
update(CalendarEntry)
.where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "provisional",
)
.values(state="declined")
)
await g.s.flush()
# Get nav OOB updates (both day and post navs)
day_nav_oob = await get_day_nav_oob(year, month, day)
post_nav_oob = await get_post_nav_oob(entry_id)
# redirect back to calendar admin or order page as you prefer
html = await render_template("_types/entry/_optioned.html")
return await make_response(html + day_nav_oob + post_nav_oob, 200)
@bp.post("/provisional/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def provisional_entry(entry_id: int, year: int, month: int, day: int, **rest):
await g.s.execute(
update(CalendarEntry)
.where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "confirmed",
)
.values(state="provisional")
)
await g.s.flush()
# Get nav OOB updates (both day and post navs)
day_nav_oob = await get_day_nav_oob(year, month, day)
post_nav_oob = await get_post_nav_oob(entry_id)
# redirect back to calendar admin or order page as you prefer
html = await render_template("_types/entry/_optioned.html")
return await make_response(html + day_nav_oob + post_nav_oob, 200)
@bp.post("/tickets/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def update_tickets(entry_id: int, **rest):
"""Update ticket configuration for a calendar entry"""
from .services.ticket_operations import update_ticket_config
from decimal import Decimal
form = await request.form
# Parse ticket price
ticket_price_str = (form.get("ticket_price") or "").strip()
ticket_price = None
if ticket_price_str:
try:
ticket_price = Decimal(ticket_price_str)
except Exception:
return await make_response("Invalid ticket price", 400)
# Parse ticket count
ticket_count_str = (form.get("ticket_count") or "").strip()
ticket_count = None
if ticket_count_str:
try:
ticket_count = int(ticket_count_str)
except Exception:
return await make_response("Invalid ticket count", 400)
# Update ticket configuration
success, error = await update_ticket_config(
g.s, entry_id, ticket_price, ticket_count
)
if not success:
return await make_response(error, 400)
await g.s.flush()
# Return updated entry view
html = await render_template("_types/entry/index.html")
return await make_response(html, 200)
@bp.get("/posts/search/")
@require_admin
async def search_posts(entry_id: int, **rest):
"""Search for posts to associate with this entry"""
query = request.args.get("q", "").strip()
page = int(request.args.get("page", 1))
per_page = 10
search_posts, total = await svc_search_posts(g.s, query, page, per_page)
total_pages = math.ceil(total / per_page) if total > 0 else 0
html = await render_template(
"_types/entry/_post_search_results.html",
search_posts=search_posts,
search_query=query,
page=page,
total_pages=total_pages,
)
return await make_response(html, 200)
@bp.post("/posts/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def add_post(entry_id: int, **rest):
"""Add a post association to this entry"""
form = await request.form
post_id = form.get("post_id")
if not post_id:
return await make_response("Post ID is required", 400)
try:
post_id = int(post_id)
except ValueError:
return await make_response("Invalid post ID", 400)
success, error = await add_post_to_entry(g.s, entry_id, post_id)
if not success:
return await make_response(error, 400)
await g.s.flush()
# Reload entry_posts for nav update
entry_posts = await get_entry_posts(g.s, entry_id)
# Return updated posts list + OOB nav update
html = await render_template("_types/entry/_posts.html")
nav_oob = await render_template(
"_types/entry/admin/_nav_posts_oob.html",
entry_posts=entry_posts,
)
return await make_response(html + nav_oob, 200)
@bp.delete("/posts/<int:post_id>/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def remove_post(entry_id: int, post_id: int, **rest):
"""Remove a post association from this entry"""
success, error = await remove_post_from_entry(g.s, entry_id, post_id)
if not success:
return await make_response(error or "Association not found", 404)
await g.s.flush()
# Reload entry_posts for nav update
entry_posts = await get_entry_posts(g.s, entry_id)
# Return updated posts list + OOB nav update
html = await render_template("_types/entry/_posts.html")
nav_oob = await render_template(
"_types/entry/admin/_nav_posts_oob.html",
entry_posts=entry_posts,
)
return await make_response(html + nav_oob, 200)
return bp

View File

@@ -0,0 +1,137 @@
from __future__ import annotations
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from sqlalchemy.sql import func
from models.calendars import CalendarEntry, CalendarEntryPost
from models.ghost_content import Post
async def add_post_to_entry(
session: AsyncSession,
entry_id: int,
post_id: int
) -> tuple[bool, str | None]:
"""
Associate a post with a calendar entry.
Returns (success, error_message).
"""
# Check if entry exists
entry = await session.scalar(
select(CalendarEntry).where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None)
)
)
if not entry:
return False, "Calendar entry not found"
# Check if post exists
post = await session.scalar(
select(Post).where(Post.id == post_id)
)
if not post:
return False, "Post not found"
# Check if association already exists
existing = await session.scalar(
select(CalendarEntryPost).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.post_id == post_id,
CalendarEntryPost.deleted_at.is_(None)
)
)
if existing:
return False, "Post is already associated with this entry"
# Create association
association = CalendarEntryPost(
entry_id=entry_id,
post_id=post_id
)
session.add(association)
await session.flush()
return True, None
async def remove_post_from_entry(
session: AsyncSession,
entry_id: int,
post_id: int
) -> tuple[bool, str | None]:
"""
Remove a post association from a calendar entry (soft delete).
Returns (success, error_message).
"""
# Find the association
association = await session.scalar(
select(CalendarEntryPost).where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.post_id == post_id,
CalendarEntryPost.deleted_at.is_(None)
)
)
if not association:
return False, "Association not found"
# Soft delete
association.deleted_at = func.now()
await session.flush()
return True, None
async def get_entry_posts(
session: AsyncSession,
entry_id: int
) -> list[Post]:
"""
Get all posts associated with a calendar entry.
"""
result = await session.execute(
select(Post)
.join(CalendarEntryPost)
.where(
CalendarEntryPost.entry_id == entry_id,
CalendarEntryPost.deleted_at.is_(None)
)
.order_by(Post.title)
)
return list(result.scalars().all())
async def search_posts(
session: AsyncSession,
query: str,
page: int = 1,
per_page: int = 10
) -> tuple[list[Post], int]:
"""
Search for posts by title with pagination.
If query is empty, returns all posts in published order.
Returns (posts, total_count).
"""
# Build base query
if query:
# Search by title
count_stmt = select(func.count(Post.id)).where(Post.title.ilike(f"%{query}%"))
posts_stmt = select(Post).where(Post.title.ilike(f"%{query}%")).order_by(Post.title)
else:
# All posts in published order (newest first)
count_stmt = select(func.count(Post.id))
posts_stmt = select(Post).order_by(Post.published_at.desc().nullslast())
# Count total
count_result = await session.execute(count_stmt)
total = count_result.scalar() or 0
# Get paginated results
offset = (page - 1) * per_page
result = await session.execute(
posts_stmt.limit(per_page).offset(offset)
)
return list(result.scalars().all()), total

View File

@@ -0,0 +1,87 @@
from __future__ import annotations
from typing import Optional
from decimal import Decimal
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import CalendarEntry
async def update_ticket_config(
session: AsyncSession,
entry_id: int,
ticket_price: Optional[Decimal],
ticket_count: Optional[int],
) -> tuple[bool, Optional[str]]:
"""
Update ticket configuration for a calendar entry.
Args:
session: Database session
entry_id: Calendar entry ID
ticket_price: Price per ticket (None = no tickets)
ticket_count: Total available tickets (None = unlimited)
Returns:
(success, error_message)
"""
# Get the entry
entry = await session.scalar(
select(CalendarEntry).where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None)
)
)
if not entry:
return False, "Calendar entry not found"
# Validate inputs
if ticket_price is not None and ticket_price < 0:
return False, "Ticket price cannot be negative"
if ticket_count is not None and ticket_count < 0:
return False, "Ticket count cannot be negative"
# Update ticket configuration
entry.ticket_price = ticket_price
entry.ticket_count = ticket_count
return True, None
async def get_available_tickets(
session: AsyncSession,
entry_id: int,
) -> tuple[Optional[int], Optional[str]]:
"""
Get the number of available tickets for a calendar entry.
Returns:
(available_count, error_message)
- available_count is None if unlimited tickets
- available_count is the remaining count if limited
"""
entry = await session.scalar(
select(CalendarEntry).where(
CalendarEntry.id == entry_id,
CalendarEntry.deleted_at.is_(None)
)
)
if not entry:
return None, "Calendar entry not found"
# If no ticket configuration, return None (unlimited)
if entry.ticket_price is None:
return None, None
# If ticket_count is None, unlimited tickets
if entry.ticket_count is None:
return None, None
# TODO: Subtract booked tickets when ticket booking is implemented
# For now, just return the total count
return entry.ticket_count, None

98
bp/calendars/routes.py Normal file
View File

@@ -0,0 +1,98 @@
from __future__ import annotations
from quart import (
request, render_template, make_response, Blueprint, g
)
from sqlalchemy import select
from models.calendars import Calendar
from .services.calendars import (
create_calendar as svc_create_calendar,
)
from ..calendar.routes import register as register_calendar
from suma_browser.app.redis_cacher import cache_page, clear_cache
from suma_browser.app.authz import require_admin
from suma_browser.app.utils.htmx import is_htmx_request
def register():
bp = Blueprint("calendars", __name__, url_prefix='/calendars')
bp.register_blueprint(
register_calendar(),
)
@bp.context_processor
async def inject_root():
# Must always return a dict
return {}
# ---------- Pages ----------
@bp.get("/")
@cache_page(tag="calendars")
async def home(**kwargs):
if not is_htmx_request():
html = await render_template(
"_types/calendars/index.html",
)
else:
html = await render_template(
"_types/calendars/_oob_elements.html",
)
return await make_response(html)
@bp.post("/new/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def create_calendar(**kwargs):
form = await request.form
name = (form.get("name") or "").strip()
# Get post_id from context if available (blog-embedded mode)
post_data = getattr(g, "post_data", None)
post_id = (post_data.get("post") or {}).get("id") if post_data else None
if not post_id:
# Standalone mode: post_id from form (or None — calendar without post)
post_id = form.get("post_id")
if post_id:
post_id = int(post_id)
try:
await svc_create_calendar(g.s, post_id, name)
except Exception as e:
return await make_response(f'<div class="text-red-600 text-sm">{e}</div>', 422)
html = await render_template(
"_types/calendars/index.html",
)
# Blog-embedded mode: also update post nav
if post_data:
from ..post.services.entry_associations import get_associated_entries
cals = (
await g.s.execute(
select(Calendar)
.where(Calendar.post_id == post_id, Calendar.deleted_at.is_(None))
.order_by(Calendar.name.asc())
)
).scalars().all()
associated_entries = await get_associated_entries(g.s, post_id)
nav_oob = await render_template(
"_types/post/admin/_nav_entries_oob.html",
associated_entries=associated_entries,
calendars=cals,
post=post_data["post"],
)
html = html + nav_oob
return await make_response(html)
return bp

View File

@@ -0,0 +1,104 @@
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import Calendar
from models.ghost_content import Post # for FK existence checks
import unicodedata
import re
class CalendarError(ValueError):
"""Base error for calendar service operations."""
from suma_browser.app.utils import (
utcnow
)
def slugify(value: str, max_len: int = 255) -> str:
"""
Make a URL-friendly slug:
- lowercase
- remove accents
- replace any non [a-z0-9]+ with '-'
- no forward slashes
- collapse multiple dashes
- trim leading/trailing dashes
"""
if value is None:
value = ""
# normalize accents -> ASCII
value = unicodedata.normalize("NFKD", value)
value = value.encode("ascii", "ignore").decode("ascii")
value = value.lower()
# explicitly block forward slashes
value = value.replace("/", "-")
# replace non-alnum with hyphen
value = re.sub(r"[^a-z0-9]+", "-", value)
# collapse multiple hyphens
value = re.sub(r"-{2,}", "-", value)
# trim hyphens and enforce length
value = value.strip("-")[:max_len].strip("-")
# fallback if empty
return value or "calendar"
async def soft_delete(sess: AsyncSession, post_slug: str, calendar_slug: str) -> bool:
cal = (
await sess.execute(
select(Calendar)
.join(Post, Calendar.post_id == Post.id)
.where(
Post.slug == post_slug,
Calendar.slug == calendar_slug,
Calendar.deleted_at.is_(None),
)
)
).scalar_one_or_none()
if not cal:
return False
cal.deleted_at = utcnow()
await sess.flush()
return True
async def create_calendar(sess: AsyncSession, post_id: int, name: str) -> Calendar:
"""
Create a calendar for a post. Name must be unique per post.
If a calendar with the same (post_id, name) exists but is soft-deleted,
it will be revived (deleted_at=None).
"""
name = (name or "").strip()
if not name:
raise CalendarError("Calendar name must not be empty.")
slug=slugify(name)
# Ensure post exists (avoid silent FK errors in some DBs)
post = (await sess.execute(select(Post.id).where(Post.id == post_id))).scalar_one_or_none()
if not post:
raise CalendarError(f"Post {post_id} does not exist.")
# Look for existing (including soft-deleted)
q = await sess.execute(
select(Calendar).where(Calendar.post_id == post_id, Calendar.name == name)
)
existing = q.scalar_one_or_none()
if existing:
if existing.deleted_at is not None:
existing.deleted_at = None # revive
await sess.flush()
return existing
raise CalendarError(f'Calendar with slug "{slug}" already exists for post {post_id}.')
cal = Calendar(post_id=post_id, name=name, slug=slug)
sess.add(cal)
await sess.flush()
return cal

28
bp/day/admin/routes.py Normal file
View File

@@ -0,0 +1,28 @@
from __future__ import annotations
from quart import (
render_template, make_response, Blueprint
)
from suma_browser.app.authz import require_admin
def register():
bp = Blueprint("admin", __name__, url_prefix='/admin')
# ---------- Pages ----------
@bp.get("/")
@require_admin
async def admin(year: int, month: int, day: int, **kwargs):
from suma_browser.app.utils.htmx import is_htmx_request
# Determine which template to use based on request type
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/day/admin/index.html")
else:
html = await render_template("_types/day/admin/_oob_elements.html")
return await make_response(html)
return bp

121
bp/day/routes.py Normal file
View File

@@ -0,0 +1,121 @@
from __future__ import annotations
from datetime import datetime, timezone, date, timedelta
from quart import (
request, render_template, make_response, Blueprint, g, abort, session as qsession
)
from suma_browser.app.bp.calendar.services import get_visible_entries_for_period
from suma_browser.app.bp.calendar_entries.routes import register as register_calendar_entries
from .admin.routes import register as register_admin
from suma_browser.app.redis_cacher import cache_page
from models.calendars import CalendarSlot # add this import
from sqlalchemy import select
from suma_browser.app.utils.htmx import is_htmx_request
def register():
bp = Blueprint("day", __name__, url_prefix='/day/<int:year>/<int:month>/<int:day>')
bp.register_blueprint(
register_calendar_entries()
)
bp.register_blueprint(
register_admin()
)
@bp.context_processor
async def inject_root():
view_args = getattr(request, "view_args", {}) or {}
day = view_args.get("day")
month = view_args.get("month")
year = view_args.get("year")
calendar = getattr(g, "calendar", None)
if not calendar:
return {}
try:
day_date = date(year, month, day)
except (ValueError, TypeError):
return {}
# Period: this day only
period_start = datetime(year, month, day, tzinfo=timezone.utc)
period_end = period_start + timedelta(days=1)
# Identity & admin flag
user = getattr(g, "user", None)
session_id = qsession.get("calendar_sid")
visible = await get_visible_entries_for_period(
sess=g.s,
calendar_id=calendar.id,
period_start=period_start,
period_end=period_end,
user=user,
session_id=session_id,
)
# --- NEW: slots for this weekday ---
weekday_attr = ["mon","tue","wed","thu","fri","sat","sun"][day_date.weekday()]
stmt = (
select(CalendarSlot)
.where(
CalendarSlot.calendar_id == calendar.id,
getattr(CalendarSlot, weekday_attr) == True, # noqa: E712
CalendarSlot.deleted_at.is_(None),
)
.order_by(CalendarSlot.time_start.asc(), CalendarSlot.id.asc())
)
result = await g.s.execute(stmt)
day_slots = list(result.scalars())
return {
"qsession": qsession,
"day_date": day_date,
"day": day,
"year": year,
"month": month,
"day_entries": visible.merged_entries,
"user_entries": visible.user_entries,
"confirmed_entries": visible.confirmed_entries,
"day_slots": day_slots, # <-- NEW
}
@bp.get("/")
@cache_page(tag="calendars")
async def show_day(year: int, month: int, day: int, **kwargs):
"""
Show a detail view for a single calendar day.
Visibility rules:
- Non-admin:
- all *confirmed* entries for that day (any user)
- all entries for current user/session (any state) for that day
(pending/ordered/provisional/confirmed)
- Admin:
- all confirmed + provisional + ordered entries for that day (all users)
- pending only for current user/session
"""
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template(
"_types/day/index.html",
)
else:
html = await render_template(
"_types/day/_oob_elements.html",
)
return await make_response(html)
return bp

182
bp/slot/routes.py Normal file
View File

@@ -0,0 +1,182 @@
from __future__ import annotations
from quart import (
request, render_template, make_response, Blueprint, g, jsonify
)
from sqlalchemy.exc import IntegrityError
from suma_browser.app.authz import require_admin
from suma_browser.app.redis_cacher import clear_cache
from .services.slot import (
update_slot as svc_update_slot,
soft_delete_slot as svc_delete_slot,
get_slot as svc_get_slot,
)
from ..slots.services.slots import (
list_slots as svc_list_slots,
)
from suma_browser.app.utils import (
parse_time,
parse_cost
)
from suma_browser.app.utils.htmx import is_htmx_request
def register():
bp = Blueprint("slot", __name__, url_prefix='/<int:slot_id>')
# ---------- Pages ----------
@bp.get("/")
@require_admin
async def get(slot_id: int, **kwargs):
slot = await svc_get_slot(g.s, slot_id)
if not slot:
return await make_response("Not found", 404)
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template(
"_types/slot/index.html",
slot=slot,
)
else:
html = await render_template(
"_types/slot/_oob_elements.html",
slot=slot,
)
return await make_response(html)
@bp.get("/edit/")
@require_admin
async def get_edit(slot_id: int, **kwargs):
slot = await svc_get_slot(g.s, slot_id)
if not slot:
return await make_response("Not found", 404)
html = await render_template(
"_types/slot/_edit.html",
slot=slot,
#post=g.post_data['post'],
#calendar=g.calendar,
)
return await make_response(html)
@bp.get("/view/")
@require_admin
async def get_view(slot_id: int, **kwargs):
slot = await svc_get_slot(g.s, slot_id)
if not slot:
return await make_response("Not found", 404)
html = await render_template(
"_types/slot/_main_panel.html",
slot=slot,
#post=g.post_data['post'],
#calendar=g.calendar,
)
return await make_response(html)
@bp.delete("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def slot_delete(slot_id: int, **kwargs):
await svc_delete_slot(g.s, slot_id)
slots = await svc_list_slots(g.s, g.calendar.id)
html = await render_template("_types/slots/_man_panel.html", calendar=g.calendar, slots=slots)
return await make_response(html)
@bp.put("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def put(slot_id: int, **kwargs):
form = await request.form
name = (form.get("name") or "").strip()
description = (form.get("description") or "").strip() or None
days = {k: form.get(k) for k in ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]}
time_start = parse_time(form.get("time_start"))
time_end = parse_time(form.get("time_end"))
cost = parse_cost(form.get("cost"))
# NEW
flexible = bool(form.get("flexible"))
field_errors: dict[str, list[str]] = {}
# Basic validation...
if not name:
field_errors.setdefault("name", []).append("Please enter a name for the slot.")
if not time_start:
field_errors.setdefault("time_start", []).append("Please select a start time.")
if not time_end:
field_errors.setdefault("time_end", []).append("Please select an end time.")
if time_start and time_end and time_end <= time_start:
field_errors.setdefault("time_end", []).append(
"End time must be after the start time."
)
if not any(form.get(d) for d in ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]):
field_errors.setdefault("days", []).append(
"Please select at least one day."
)
if field_errors:
return jsonify(
{
"message": "Please fix the highlighted fields.",
"errors": field_errors,
}
), 422
# DB update + friendly duplicate handling
try:
slot = await svc_update_slot(
g.s,
slot_id,
name=name,
description=description,
days=days,
time_start=time_start,
time_end=time_end,
cost=cost,
flexible=flexible, # <--- NEW
)
except IntegrityError as e:
msg = str(e.orig) if getattr(e, "orig", None) else str(e)
if "uq_calendar_slots_unique_band" in msg or "duplicate key value" in msg:
field_errors = {
"name": [f'A slot called “{name}” already exists on this calendar.']
}
return jsonify(
{
"message": "That slot name is already in use.",
"errors": field_errors,
}
), 422
return jsonify(
{
"message": "An unexpected error occurred while updating the slot.",
"errors": {"__all__": [msg]},
}
), 422
html = await render_template(
"_types/slot/_main_panel.html",
slot=slot,
oob=True,
)
return await make_response(html)
return bp

90
bp/slot/services/slot.py Normal file
View File

@@ -0,0 +1,90 @@
from __future__ import annotations
from datetime import time
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import CalendarSlot
class SlotError(ValueError):
pass
def _b(v):
if isinstance(v, bool):
return v
s = str(v).lower()
return s in {"1","true","t","yes","y","on"}
async def update_slot(
sess: AsyncSession,
slot_id: int,
*,
name: str | None = None,
description: str | None = None,
days: dict | None = None,
time_start: time | None = None,
time_end: time | None = None,
cost: float | None = None,
flexible: bool | None = None, # NEW
):
slot = await sess.get(CalendarSlot, slot_id)
if not slot or slot.deleted_at is not None:
raise SlotError("slot not found")
if name is not None:
slot.name = name
if description is not None:
slot.description = description or None
if days is not None:
slot.mon = _b(days.get("mon", slot.mon))
slot.tue = _b(days.get("tue", slot.tue))
slot.wed = _b(days.get("wed", slot.wed))
slot.thu = _b(days.get("thu", slot.thu))
slot.fri = _b(days.get("fri", slot.fri))
slot.sat = _b(days.get("sat", slot.sat))
slot.sun = _b(days.get("sun", slot.sun))
if time_start is not None:
slot.time_start = time_start
if time_end is not None:
slot.time_end = time_end
if (time_start or time_end) and slot.time_end <= slot.time_start:
raise SlotError("time range invalid")
if cost is not None:
slot.cost = cost
# NEW: update flexible flag only if explicitly provided
if flexible is not None:
slot.flexible = flexible
await sess.flush()
return slot
async def soft_delete_slot(sess: AsyncSession, slot_id: int):
slot = await sess.get(CalendarSlot, slot_id)
if not slot or slot.deleted_at is not None:
return
from datetime import datetime, timezone
slot.deleted_at = datetime.now(timezone.utc)
await sess.flush()
async def get_slot(sess: AsyncSession, slot_id: int) -> CalendarSlot | None:
return await sess.get(CalendarSlot, slot_id)
async def update_slot_description(
sess: AsyncSession,
slot_id: int,
description: str | None,
) -> CalendarSlot:
slot = await sess.get(CalendarSlot, slot_id)
if not slot:
raise SlotError("slot not found")
slot.description = description or None
await sess.flush()
return slot

152
bp/slots/routes.py Normal file
View File

@@ -0,0 +1,152 @@
from __future__ import annotations
from quart import (
request, render_template, make_response, Blueprint, g, jsonify
)
from sqlalchemy.exc import IntegrityError
from suma_browser.app.authz import require_admin
from suma_browser.app.redis_cacher import clear_cache
from .services.slots import (
list_slots as svc_list_slots,
create_slot as svc_create_slot,
)
from ..slot.routes import register as register_slot
from suma_browser.app.utils import (
parse_time,
parse_cost
)
from suma_browser.app.utils.htmx import is_htmx_request
def register():
bp = Blueprint("slots", __name__, url_prefix='/slots')
# ---------- Pages ----------
bp.register_blueprint(
register_slot()
)
@bp.context_processor
async def get_slots():
calendar = getattr(g, "calendar", None)
if calendar:
return {
"slots": await svc_list_slots(g.s, calendar.id)
}
return {"slots": []}
@bp.get("/")
async def get(**kwargs):
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template(
"_types/slots/index.html",
)
else:
html = await render_template(
"_types/slots/_oob_elements.html",
)
return await make_response(html)
@bp.post("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def post(**kwargs):
form = await request.form
name = (form.get("name") or "").strip()
description = (form.get("description") or "").strip() or None
days = {k: form.get(k) for k in ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]}
time_start = parse_time(form.get("time_start"))
time_end = parse_time(form.get("time_end"))
cost = parse_cost(form.get("cost"))
# NEW: flexible flag from checkbox
flexible = bool(form.get("flexible"))
field_errors: dict[str, list[str]] = {}
if not name:
field_errors.setdefault("name", []).append("Please enter a name for the slot.")
if not time_start:
field_errors.setdefault("time_start", []).append("Please select a start time.")
if not time_end:
field_errors.setdefault("time_end", []).append("Please select an end time.")
if time_start and time_end and time_end <= time_start:
field_errors.setdefault("time_end", []).append("End time must be after the start time.")
if not any(form.get(d) for d in ["mon", "tue", "wed", "thu", "fri", "sat", "sun"]):
field_errors.setdefault("days", []).append("Please select at least one day.")
if field_errors:
return jsonify({
"message": "Please fix the highlighted fields.",
"errors": field_errors,
}), 422
# DB insert with friendly duplicate detection
try:
await svc_create_slot(
g.s,
g.calendar.id,
name=name,
description=description,
days=days,
time_start=time_start,
time_end=time_end,
cost=cost,
flexible=flexible, # <<< NEW
)
except IntegrityError as e:
# Improve duplicate detection: check constraint name or message
msg = str(e.orig) if getattr(e, "orig", None) else str(e)
if "uq_calendar_slots_unique_band" in msg or "duplicate key value" in msg:
field_errors = {
"name": [f"A slot called “{name}” already exists on this calendar."]
}
return jsonify({
"message": "That slot name is already in use.",
"errors": field_errors,
}), 422
# Unknown DB error
return jsonify({
"message": "An unexpected error occurred while saving the slot.",
"errors": {"__all__": [msg]},
}), 422
# Success → re-render the slots table
html = await render_template("_types/slots/_main_panel.html")
return await make_response(html)
@bp.get("/add")
@require_admin
async def add_form(**kwargs):
html = await render_template(
"_types/slots/_add.html",
)
return await make_response(html)
@bp.get("/add-button")
@require_admin
async def add_button(**kwargs):
html = await render_template(
"_types/slots/_add_button.html",
)
return await make_response(html)
return bp

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
from datetime import time
from typing import Sequence
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import CalendarSlot
class SlotError(ValueError):
pass
def _b(v):
if isinstance(v, bool):
return v
s = str(v).lower()
return s in {"1","true","t","yes","y","on"}
async def list_slots(sess: AsyncSession, calendar_id: int) -> Sequence[CalendarSlot]:
res = await sess.execute(
select(CalendarSlot)
.where(CalendarSlot.calendar_id == calendar_id, CalendarSlot.deleted_at.is_(None))
.order_by(CalendarSlot.time_start.asc(), CalendarSlot.id.asc())
)
return res.scalars().all()
async def create_slot(
sess: AsyncSession,
calendar_id: int,
*,
name: str,
description: str | None,
days: dict,
time_start: time,
time_end: time,
cost: float | None,
flexible: bool = False, # NEW
):
if not name:
raise SlotError("name is required")
if not time_start or not time_end or time_end <= time_start:
raise SlotError("time range invalid")
slot = CalendarSlot(
calendar_id=calendar_id,
name=name,
description=(description or None),
mon=_b(days.get("mon")),
tue=_b(days.get("tue")),
wed=_b(days.get("wed")),
thu=_b(days.get("thu")),
fri=_b(days.get("fri")),
sat=_b(days.get("sat")),
sun=_b(days.get("sun")),
time_start=time_start,
time_end=time_end,
cost=cost,
flexible=flexible, # NEW
)
sess.add(slot)
await sess.flush()
return slot

159
bp/ticket_type/routes.py Normal file
View File

@@ -0,0 +1,159 @@
from __future__ import annotations
from quart import (
request, render_template, make_response, Blueprint, g, jsonify
)
from suma_browser.app.authz import require_admin
from suma_browser.app.redis_cacher import clear_cache
from .services.ticket import (
get_ticket_type as svc_get_ticket_type,
update_ticket_type as svc_update_ticket_type,
soft_delete_ticket_type as svc_delete_ticket_type,
)
from ..ticket_types.services.tickets import (
list_ticket_types as svc_list_ticket_types,
)
from suma_browser.app.utils.htmx import is_htmx_request
def register():
bp = Blueprint("ticket_type", __name__, url_prefix='/<int:ticket_type_id>')
@bp.get("/")
@require_admin
async def get(ticket_type_id: int, **kwargs):
"""View a single ticket type."""
ticket_type = await svc_get_ticket_type(g.s, ticket_type_id)
if not ticket_type:
return await make_response("Not found", 404)
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template(
"_types/ticket_type/index.html",
ticket_type=ticket_type,
)
else:
html = await render_template(
"_types/ticket_type/_oob_elements.html",
ticket_type=ticket_type,
)
return await make_response(html)
@bp.get("/edit/")
@require_admin
async def get_edit(ticket_type_id: int, **kwargs):
"""Show the edit form for a ticket type."""
ticket_type = await svc_get_ticket_type(g.s, ticket_type_id)
if not ticket_type:
return await make_response("Not found", 404)
html = await render_template(
"_types/ticket_type/_edit.html",
ticket_type=ticket_type,
)
return await make_response(html)
@bp.get("/view/")
@require_admin
async def get_view(ticket_type_id: int, **kwargs):
"""Show the view for a ticket type."""
ticket_type = await svc_get_ticket_type(g.s, ticket_type_id)
if not ticket_type:
return await make_response("Not found", 404)
html = await render_template(
"_types/ticket_type/_main_panel.html",
ticket_type=ticket_type,
)
return await make_response(html)
@bp.put("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def put(ticket_type_id: int, **kwargs):
"""Update a ticket type."""
form = await request.form
name = (form.get("name") or "").strip()
cost_str = (form.get("cost") or "").strip()
count_str = (form.get("count") or "").strip()
field_errors: dict[str, list[str]] = {}
# Validate name
if not name:
field_errors.setdefault("name", []).append("Please enter a ticket type name.")
# Validate cost
cost = None
if not cost_str:
field_errors.setdefault("cost", []).append("Please enter a cost.")
else:
try:
cost = float(cost_str)
if cost < 0:
field_errors.setdefault("cost", []).append("Cost must be positive.")
except ValueError:
field_errors.setdefault("cost", []).append("Please enter a valid number.")
# Validate count
count = None
if not count_str:
field_errors.setdefault("count", []).append("Please enter a ticket count.")
else:
try:
count = int(count_str)
if count < 0:
field_errors.setdefault("count", []).append("Count must be positive.")
except ValueError:
field_errors.setdefault("count", []).append("Please enter a valid whole number.")
if field_errors:
return jsonify({
"message": "Please fix the highlighted fields.",
"errors": field_errors,
}), 422
# Update ticket type
ticket_type = await svc_update_ticket_type(
g.s,
ticket_type_id,
name=name,
cost=cost,
count=count,
)
if not ticket_type:
return await make_response("Not found", 404)
# Return updated view with OOB flag
html = await render_template(
"_types/ticket_type/_main_panel.html",
ticket_type=ticket_type,
oob=True,
)
return await make_response(html)
@bp.delete("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def delete(ticket_type_id: int, **kwargs):
"""Soft-delete a ticket type."""
success = await svc_delete_ticket_type(g.s, ticket_type_id)
if not success:
return await make_response("Not found", 404)
# Re-render the ticket types list
ticket_types = await svc_list_ticket_types(g.s, g.entry.id)
html = await render_template(
"_types/ticket_types/_main_panel.html",
ticket_types=ticket_types
)
return await make_response(html)
return bp

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.calendars import TicketType
from datetime import datetime, timezone
def utcnow() -> datetime:
return datetime.now(timezone.utc)
async def get_ticket_type(session: AsyncSession, ticket_type_id: int) -> TicketType | None:
"""Get a single ticket type by ID (only if not soft-deleted)."""
result = await session.execute(
select(TicketType)
.where(
TicketType.id == ticket_type_id,
TicketType.deleted_at.is_(None)
)
)
return result.scalar_one_or_none()
async def update_ticket_type(
session: AsyncSession,
ticket_type_id: int,
*,
name: str,
cost: float,
count: int,
) -> TicketType | None:
"""Update an existing ticket type."""
ticket_type = await get_ticket_type(session, ticket_type_id)
if not ticket_type:
return None
ticket_type.name = name
ticket_type.cost = cost
ticket_type.count = count
ticket_type.updated_at = utcnow()
await session.flush()
return ticket_type
async def soft_delete_ticket_type(session: AsyncSession, ticket_type_id: int) -> bool:
"""Soft-delete a ticket type."""
ticket_type = await get_ticket_type(session, ticket_type_id)
if not ticket_type:
return False
ticket_type.deleted_at = utcnow()
await session.flush()
return True

132
bp/ticket_types/routes.py Normal file
View File

@@ -0,0 +1,132 @@
from __future__ import annotations
from quart import (
request, render_template, make_response, Blueprint, g, jsonify
)
from suma_browser.app.authz import require_admin
from suma_browser.app.redis_cacher import clear_cache
from .services.tickets import (
list_ticket_types as svc_list_ticket_types,
create_ticket_type as svc_create_ticket_type,
)
from ..ticket_type.routes import register as register_ticket_type
from suma_browser.app.utils.htmx import is_htmx_request
def register():
bp = Blueprint("ticket_types", __name__, url_prefix='/ticket-types')
# Register individual ticket routes
bp.register_blueprint(
register_ticket_type()
)
@bp.context_processor
async def get_ticket_types():
"""Make ticket types available to all templates in this blueprint."""
entry = getattr(g, "entry", None)
if entry:
return {
"ticket_types": await svc_list_ticket_types(g.s, entry.id)
}
return {"ticket_types": []}
@bp.get("/")
async def get(**kwargs):
"""List all ticket types for the current entry."""
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template(
"_types/ticket_types/index.html",
)
else:
html = await render_template(
"_types/ticket_types/_oob_elements.html",
)
return await make_response(html)
@bp.post("/")
@require_admin
@clear_cache(tag="calendars", tag_scope="all")
async def post(**kwargs):
"""Create a new ticket type."""
form = await request.form
name = (form.get("name") or "").strip()
cost_str = (form.get("cost") or "").strip()
count_str = (form.get("count") or "").strip()
field_errors: dict[str, list[str]] = {}
# Validate name
if not name:
field_errors.setdefault("name", []).append("Please enter a ticket type name.")
# Validate cost
cost = None
if not cost_str:
field_errors.setdefault("cost", []).append("Please enter a cost.")
else:
try:
cost = float(cost_str)
if cost < 0:
field_errors.setdefault("cost", []).append("Cost must be positive.")
except ValueError:
field_errors.setdefault("cost", []).append("Please enter a valid number.")
# Validate count
count = None
if not count_str:
field_errors.setdefault("count", []).append("Please enter a ticket count.")
else:
try:
count = int(count_str)
if count < 0:
field_errors.setdefault("count", []).append("Count must be positive.")
except ValueError:
field_errors.setdefault("count", []).append("Please enter a valid whole number.")
if field_errors:
return jsonify({
"message": "Please fix the highlighted fields.",
"errors": field_errors,
}), 422
# Create ticket type
await svc_create_ticket_type(
g.s,
g.entry.id,
name=name,
cost=cost,
count=count,
)
# Success → re-render the ticket types table
html = await render_template("_types/ticket_types/_main_panel.html")
return await make_response(html)
@bp.get("/add")
@require_admin
async def add_form(**kwargs):
"""Show the add ticket type form."""
html = await render_template(
"_types/ticket_types/_add.html",
)
return await make_response(html)
@bp.get("/add-button")
@require_admin
async def add_button(**kwargs):
"""Show the add ticket type button."""
html = await render_template(
"_types/ticket_types/_add_button.html",
)
return await make_response(html)
return bp

View File

@@ -0,0 +1,47 @@
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from models.calendars import TicketType
from datetime import datetime, timezone
def utcnow() -> datetime:
return datetime.now(timezone.utc)
async def list_ticket_types(session: AsyncSession, entry_id: int) -> list[TicketType]:
"""Get all active ticket types for a calendar entry."""
result = await session.execute(
select(TicketType)
.where(
TicketType.entry_id == entry_id,
TicketType.deleted_at.is_(None)
)
.order_by(TicketType.name)
)
return list(result.scalars().all())
async def create_ticket_type(
session: AsyncSession,
entry_id: int,
*,
name: str,
cost: float,
count: int,
) -> TicketType:
"""Create a new ticket type for a calendar entry."""
ticket_type = TicketType(
entry_id=entry_id,
name=name,
cost=cost,
count=count,
created_at=utcnow(),
updated_at=utcnow(),
)
session.add(ticket_type)
await session.flush()
return ticket_type