Move render functions, layouts, helpers, and utils from __init__.py to sub-modules (renders.py, layouts.py, helpers.py, utils.py). Update all bp route imports to point at sub-modules directly. Each __init__.py is now ≤20 lines of setup + registration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
161 lines
5.8 KiB
Python
161 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
from quart import Blueprint, g, redirect, url_for, make_response
|
|
from sqlalchemy import select, func, or_, cast, String, exists
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
|
|
from shared.models.order import Order, OrderItem
|
|
from shared.browser.app.payments.sumup import create_checkout as sumup_create_checkout
|
|
from shared.config import config
|
|
|
|
from shared.infrastructure.http_utils import vary as _vary, current_url_without_page as _current_url_without_page
|
|
from shared.infrastructure.cart_identity import current_cart_identity
|
|
from bp.cart.services import check_sumup_status
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
from shared.sx.helpers import sx_response
|
|
|
|
from .filters.qs import makeqs_factory, decode
|
|
|
|
|
|
def _owner_filter():
|
|
"""Return SQLAlchemy clause restricting orders to current user/session."""
|
|
ident = current_cart_identity()
|
|
if ident["user_id"]:
|
|
return Order.user_id == ident["user_id"]
|
|
if ident["session_id"]:
|
|
return Order.session_id == ident["session_id"]
|
|
return None
|
|
|
|
|
|
def register() -> Blueprint:
|
|
bp = Blueprint("order", __name__, url_prefix='/<int:order_id>')
|
|
|
|
ORDERS_PER_PAGE = 10 # keep in sync with browse page size / your preference
|
|
|
|
@bp.before_request
|
|
def route():
|
|
# this is the crucial bit for the |qs filter
|
|
g.makeqs_factory = makeqs_factory
|
|
|
|
@bp.get("/")
|
|
async def order_detail(order_id: int):
|
|
"""
|
|
Show a single order + items.
|
|
"""
|
|
owner = _owner_filter()
|
|
if owner is None:
|
|
return await make_response("Order not found", 404)
|
|
result = await g.s.execute(
|
|
select(Order)
|
|
.options(
|
|
selectinload(Order.items)
|
|
)
|
|
.where(Order.id == order_id, owner)
|
|
)
|
|
order = result.scalar_one_or_none()
|
|
if not order:
|
|
return await make_response("Order not found", 404)
|
|
from shared.sx.page import get_template_context
|
|
from sxc.pages.renders import render_order_page, render_order_oob
|
|
|
|
ctx = await get_template_context()
|
|
calendar_entries = ctx.get("calendar_entries")
|
|
|
|
if not is_htmx_request():
|
|
html = await render_order_page(ctx, order, calendar_entries, url_for)
|
|
return await make_response(html)
|
|
else:
|
|
sx_src = await render_order_oob(ctx, order, calendar_entries, url_for)
|
|
return sx_response(sx_src)
|
|
|
|
@bp.get("/pay/")
|
|
async def order_pay(order_id: int):
|
|
"""
|
|
Re-open the SumUp payment page for this order.
|
|
If already paid, just go back to the order detail.
|
|
If not, (re)create a SumUp checkout and redirect.
|
|
"""
|
|
owner = _owner_filter()
|
|
if owner is None:
|
|
return await make_response("Order not found", 404)
|
|
result = await g.s.execute(select(Order).where(Order.id == order_id, owner))
|
|
order = result.scalar_one_or_none()
|
|
if not order:
|
|
return await make_response("Order not found", 404)
|
|
|
|
if order.status == "paid":
|
|
# Already paid; nothing to pay
|
|
return redirect(url_for("orders.order.order_detail", order_id=order.id))
|
|
|
|
# Prefer to reuse existing hosted URL if we have one
|
|
if order.sumup_hosted_url:
|
|
return redirect(order.sumup_hosted_url)
|
|
|
|
# Otherwise, create a fresh checkout for this order
|
|
redirect_url = url_for("cart_global.checkout_return", order_id=order.id, _external=True)
|
|
|
|
sumup_cfg = config().get("sumup", {}) or {}
|
|
webhook_secret = sumup_cfg.get("webhook_secret")
|
|
|
|
webhook_url = url_for("cart_global.checkout_webhook", order_id=order.id, _external=True)
|
|
if webhook_secret:
|
|
from urllib.parse import urlencode
|
|
|
|
sep = "&" if "?" in webhook_url else "?"
|
|
webhook_url = f"{webhook_url}{sep}{urlencode({'token': webhook_secret})}"
|
|
|
|
checkout_data = await sumup_create_checkout(
|
|
order,
|
|
redirect_url=redirect_url,
|
|
webhook_url=webhook_url,
|
|
)
|
|
|
|
order.sumup_checkout_id = checkout_data.get("id")
|
|
order.sumup_status = checkout_data.get("status")
|
|
|
|
hosted_cfg = checkout_data.get("hosted_checkout") or {}
|
|
hosted_url = hosted_cfg.get("hosted_checkout_url") or checkout_data.get("hosted_checkout_url")
|
|
order.sumup_hosted_url = hosted_url
|
|
|
|
await g.s.flush()
|
|
|
|
if not hosted_url:
|
|
from shared.sx.page import get_template_context
|
|
from sxc.pages.renders import render_checkout_error_page
|
|
tctx = await get_template_context()
|
|
html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp when trying to reopen payment.", order=order)
|
|
return await make_response(html, 500)
|
|
|
|
return redirect(hosted_url)
|
|
|
|
@bp.post("/recheck/")
|
|
async def order_recheck(order_id: int):
|
|
"""
|
|
Manually re-check this order's status with SumUp.
|
|
Useful if the webhook hasn't fired or the user didn't return correctly.
|
|
"""
|
|
owner = _owner_filter()
|
|
if owner is None:
|
|
return await make_response("Order not found", 404)
|
|
result = await g.s.execute(select(Order).where(Order.id == order_id, owner))
|
|
order = result.scalar_one_or_none()
|
|
if not order:
|
|
return await make_response("Order not found", 404)
|
|
|
|
# If we don't have a checkout ID yet, nothing to query
|
|
if not order.sumup_checkout_id:
|
|
return redirect(url_for("orders.order.order_detail", order_id=order.id))
|
|
|
|
try:
|
|
await check_sumup_status(g.s, order)
|
|
except Exception:
|
|
# In a real app, log the error; here we just fall back to previous status
|
|
pass
|
|
|
|
return redirect(url_for("orders.order.order_detail", order_id=order.id))
|
|
|
|
|
|
return bp
|
|
|