"""Events app data endpoints. Exposes read-only JSON queries at ``/internal/data/`` for cross-app callers via the internal data client. """ from __future__ import annotations from quart import Blueprint, g, jsonify, request from shared.infrastructure.data_client import DATA_HEADER from shared.contracts.dtos import dto_to_dict from shared.services.registry import services def register() -> Blueprint: bp = Blueprint("data", __name__, url_prefix="/internal/data") @bp.before_request async def _require_data_header(): if not request.headers.get(DATA_HEADER): return jsonify({"error": "forbidden"}), 403 from shared.infrastructure.internal_auth import validate_internal_request if not validate_internal_request(): return jsonify({"error": "forbidden"}), 403 _handlers: dict[str, object] = {} @bp.get("/") async def handle_query(query_name: str): handler = _handlers.get(query_name) if handler is None: return jsonify({"error": "unknown query"}), 404 result = await handler() return jsonify(result) # --- pending-entries --- async def _pending_entries(): user_id = request.args.get("user_id", type=int) session_id = request.args.get("session_id") entries = await services.calendar.pending_entries( g.s, user_id=user_id, session_id=session_id, ) return [dto_to_dict(e) for e in entries] _handlers["pending-entries"] = _pending_entries # --- pending-tickets --- async def _pending_tickets(): user_id = request.args.get("user_id", type=int) session_id = request.args.get("session_id") tickets = await services.calendar.pending_tickets( g.s, user_id=user_id, session_id=session_id, ) return [dto_to_dict(t) for t in tickets] _handlers["pending-tickets"] = _pending_tickets # --- entries-for-page --- async def _entries_for_page(): page_id = request.args.get("page_id", type=int) user_id = request.args.get("user_id", type=int) session_id = request.args.get("session_id") entries = await services.calendar.entries_for_page( g.s, page_id, user_id=user_id, session_id=session_id, ) return [dto_to_dict(e) for e in entries] _handlers["entries-for-page"] = _entries_for_page # --- tickets-for-page --- async def _tickets_for_page(): page_id = request.args.get("page_id", type=int) user_id = request.args.get("user_id", type=int) session_id = request.args.get("session_id") tickets = await services.calendar.tickets_for_page( g.s, page_id, user_id=user_id, session_id=session_id, ) return [dto_to_dict(t) for t in tickets] _handlers["tickets-for-page"] = _tickets_for_page # --- entries-for-order --- async def _entries_for_order(): order_id = request.args.get("order_id", type=int) entries = await services.calendar.get_entries_for_order(g.s, order_id) return [dto_to_dict(e) for e in entries] _handlers["entries-for-order"] = _entries_for_order # --- tickets-for-order --- async def _tickets_for_order(): order_id = request.args.get("order_id", type=int) tickets = await services.calendar.get_tickets_for_order(g.s, order_id) return [dto_to_dict(t) for t in tickets] _handlers["tickets-for-order"] = _tickets_for_order # --- entry-ids-for-content --- async def _entry_ids_for_content(): content_type = request.args.get("content_type", "") content_id = request.args.get("content_id", type=int) ids = await services.calendar.entry_ids_for_content(g.s, content_type, content_id) return list(ids) _handlers["entry-ids-for-content"] = _entry_ids_for_content # --- associated-entries --- async def _associated_entries(): content_type = request.args.get("content_type", "") content_id = request.args.get("content_id", type=int) page = request.args.get("page", 1, type=int) entries, has_more = await services.calendar.associated_entries( g.s, content_type, content_id, page, ) return {"entries": [dto_to_dict(e) for e in entries], "has_more": has_more} _handlers["associated-entries"] = _associated_entries # --- calendars-for-container --- async def _calendars_for_container(): container_type = request.args.get("type", "") container_id = request.args.get("id", type=int) calendars = await services.calendar.calendars_for_container( g.s, container_type, container_id, ) return [dto_to_dict(c) for c in calendars] _handlers["calendars-for-container"] = _calendars_for_container # --- visible-entries-for-period --- async def _visible_entries_for_period(): from datetime import datetime calendar_id = request.args.get("calendar_id", type=int) period_start = datetime.fromisoformat(request.args.get("period_start", "")) period_end = datetime.fromisoformat(request.args.get("period_end", "")) user_id = request.args.get("user_id", type=int) session_id = request.args.get("session_id") # is_admin determined server-side, never from client params is_admin = False entries = await services.calendar.visible_entries_for_period( g.s, calendar_id, period_start, period_end, user_id=user_id, is_admin=is_admin, session_id=session_id, ) return [dto_to_dict(e) for e in entries] _handlers["visible-entries-for-period"] = _visible_entries_for_period return bp