from __future__ import annotations from quart import Blueprint, g, redirect, url_for, request from sqlalchemy import select, func, or_, cast, String, exists from sqlalchemy.orm import selectinload from shared.models.order import Order, OrderItem from shared.infrastructure.http_utils import vary as _vary, current_url_without_page as _current_url_without_page from shared.infrastructure.cart_identity import current_cart_identity from bp.order.routes import register as register_order from .filters.qs import makeqs_factory, decode def register(url_prefix: str) -> Blueprint: bp = Blueprint("orders", __name__, url_prefix=url_prefix) bp.register_blueprint(register_order()) ORDERS_PER_PAGE = 10 @bp.before_request def route(): g.makeqs_factory = makeqs_factory @bp.before_request async def _require_identity(): """Orders require a logged-in user or at least a cart session.""" ident = current_cart_identity() if not ident["user_id"] and not ident["session_id"]: return redirect(url_for("auth.login_form")) @bp.get("/rows") async def orders_rows(): """Pagination endpoint — returns order rows for page > 1.""" ident = current_cart_identity() if ident["user_id"]: owner_clause = Order.user_id == ident["user_id"] elif ident["session_id"]: owner_clause = Order.session_id == ident["session_id"] else: return redirect(url_for("auth.login_form")) q = decode() page, search = q.page, q.search if page < 1: page = 1 where_clause = _search_clause(search) if search else None count_stmt = select(func.count()).select_from(Order).where(owner_clause) if where_clause is not None: count_stmt = count_stmt.where(where_clause) total_count_result = await g.s.execute(count_stmt) total_count = total_count_result.scalar_one() or 0 total_pages = max(1, (total_count + ORDERS_PER_PAGE - 1) // ORDERS_PER_PAGE) if page > total_pages: page = total_pages offset = (page - 1) * ORDERS_PER_PAGE stmt = ( select(Order) .where(owner_clause) .order_by(Order.created_at.desc()) .offset(offset) .limit(ORDERS_PER_PAGE) ) if where_clause is not None: stmt = stmt.where(where_clause) result = await g.s.execute(stmt) orders = result.scalars().all() from shared.sx.helpers import sx_response, sx_call from shared.utils import route_prefix pfx = route_prefix() order_dicts = [] for o in orders: order_dicts.append({ "id": o.id, "status": o.status or "pending", "created_at_formatted": o.created_at.strftime("%-d %b %Y, %H:%M") if o.created_at else "\u2014", "description": o.description or "", "currency": o.currency or "GBP", "total_formatted": f"{o.total_amount or 0:.2f}", }) detail_prefix = pfx + url_for("defpage_order_detail", order_id=0).rsplit("0/", 1)[0] qs_fn = makeqs_factory() rows_url = pfx + url_for("orders.orders_rows") # Build just the rows fragment (not full table) for infinite scroll parts = [] for od in order_dicts: parts.append(sx_call("order-row-pair", order=od, detail_url_prefix=detail_prefix)) if page < total_pages: parts.append(sx_call("infinite-scroll", url=rows_url + qs_fn(page=page + 1), page=page, total_pages=total_pages, id_prefix="orders", colspan=5)) else: parts.append(sx_call("order-end-row")) sx_src = "(<> " + " ".join(parts) + ")" resp = sx_response(sx_src) resp.headers["Hx-Push-Url"] = _current_url_without_page() return _vary(resp) return bp def _search_clause(search: str): """Build an OR search clause across order fields.""" term = f"%{search.strip()}%" conditions = [ Order.status.ilike(term), Order.currency.ilike(term), Order.sumup_checkout_id.ilike(term), Order.sumup_status.ilike(term), Order.description.ilike(term), ] conditions.append( exists( select(1) .select_from(OrderItem) .where( OrderItem.order_id == Order.id, or_( OrderItem.product_title.ilike(term), OrderItem.product_slug.ilike(term), ), ) ) ) try: search_id = int(search) except (TypeError, ValueError): search_id = None if search_id is not None: conditions.append(Order.id == search_id) else: conditions.append(cast(Order.id, String).ilike(term)) return or_(*conditions)