"""Relations app data endpoints.""" from __future__ import annotations from quart import Blueprint, g, jsonify, request from shared.infrastructure.data_client import DATA_HEADER def register() -> Blueprint: bp = Blueprint("data", __name__, url_prefix="/internal/data") @bp.before_request async def _require_data_header(): if not request.headers.get(DATA_HEADER): return jsonify({"error": "forbidden"}), 403 from shared.infrastructure.internal_auth import validate_internal_request if not validate_internal_request(): return jsonify({"error": "forbidden"}), 403 _handlers: dict[str, object] = {} @bp.get("/") async def handle_query(query_name: str): handler = _handlers.get(query_name) if handler is None: return jsonify({"error": "unknown query"}), 404 result = await handler() return jsonify(result) # --- get-children --- async def _get_children(): """Return ContainerRelation children for a parent.""" from shared.services.relationships import get_children parent_type = request.args.get("parent_type", "") parent_id = request.args.get("parent_id", type=int) child_type = request.args.get("child_type") if not parent_type or parent_id is None: return [] rels = await get_children(g.s, parent_type, parent_id, child_type) return [ { "id": r.id, "parent_type": r.parent_type, "parent_id": r.parent_id, "child_type": r.child_type, "child_id": r.child_id, "sort_order": r.sort_order, "label": r.label, } for r in rels ] _handlers["get-children"] = _get_children return bp