"""Cart app data endpoints. Exposes read-only JSON queries at ``/internal/data/`` for cross-app callers via the internal data client. """ from __future__ import annotations from quart import Blueprint, g, jsonify, request from shared.infrastructure.data_client import DATA_HEADER from shared.contracts.dtos import dto_to_dict from shared.services.registry import services def register() -> Blueprint: bp = Blueprint("data", __name__, url_prefix="/internal/data") @bp.before_request async def _require_data_header(): if not request.headers.get(DATA_HEADER): return jsonify({"error": "forbidden"}), 403 from shared.infrastructure.internal_auth import validate_internal_request if not validate_internal_request(): return jsonify({"error": "forbidden"}), 403 _handlers: dict[str, object] = {} @bp.get("/") async def handle_query(query_name: str): handler = _handlers.get(query_name) if handler is None: return jsonify({"error": "unknown query"}), 404 result = await handler() return jsonify(result) # --- cart-summary --- async def _cart_summary(): user_id = request.args.get("user_id", type=int) session_id = request.args.get("session_id") page_slug = request.args.get("page_slug") summary = await services.cart.cart_summary( g.s, user_id=user_id, session_id=session_id, page_slug=page_slug, ) return dto_to_dict(summary) _handlers["cart-summary"] = _cart_summary # --- cart-items (product slugs + quantities for template rendering) --- async def _cart_items(): from sqlalchemy import select from shared.models.market import CartItem user_id = request.args.get("user_id", type=int) session_id = request.args.get("session_id") filters = [CartItem.deleted_at.is_(None)] if user_id is not None: filters.append(CartItem.user_id == user_id) elif session_id is not None: filters.append(CartItem.session_id == session_id) else: return [] result = await g.s.execute( select(CartItem).where(*filters) ) items = result.scalars().all() return [ { "product_id": item.product_id, "product_slug": item.product_slug, "quantity": item.quantity, } for item in items ] _handlers["cart-items"] = _cart_items return bp