Files
rose-ash/orders/bp/order/routes.py
giles c243d17eeb
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 3m41s
Migrate all apps to defpage declarative page routes
Replace Python GET page handlers with declarative defpage definitions in .sx
files across all 8 apps (sx docs, orders, account, market, cart, federation,
events, blog). Each app now has sxc/pages/ with setup functions, layout
registrations, page helpers, and .sx defpage declarations.

Core infrastructure: add g I/O primitive, PageDef support for auth/layout/
data/content/filter/aside/menu slots, post_author auth level, and custom
layout registration. Remove ~1400 lines of render_*_page/render_*_oob
boilerplate. Update all endpoint references in routes, sx_components, and
templates to defpage_* naming.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-03 14:52:34 +00:00

102 lines
3.8 KiB
Python

from __future__ import annotations
from quart import Blueprint, g, redirect, url_for, make_response
from sqlalchemy import select
from shared.models.order import Order
from shared.browser.app.payments.sumup import create_checkout as sumup_create_checkout
from shared.config import config
from shared.infrastructure.cart_identity import current_cart_identity
from services.check_sumup_status import check_sumup_status
from .filters.qs import makeqs_factory, decode
def _owner_filter():
"""Return SQLAlchemy clause restricting orders to current user/session."""
ident = current_cart_identity()
if ident["user_id"]:
return Order.user_id == ident["user_id"]
if ident["session_id"]:
return Order.session_id == ident["session_id"]
return None
def register() -> Blueprint:
bp = Blueprint("order", __name__, url_prefix='/<int:order_id>')
@bp.before_request
def route():
g.makeqs_factory = makeqs_factory
@bp.get("/pay/")
async def order_pay(order_id: int):
"""Re-open the SumUp payment page for this order."""
owner = _owner_filter()
if owner is None:
return await make_response("Order not found", 404)
result = await g.s.execute(select(Order).where(Order.id == order_id, owner))
order = result.scalar_one_or_none()
if not order:
return await make_response("Order not found", 404)
if order.status == "paid":
return redirect(url_for("orders.defpage_order_detail", order_id=order.id))
if order.sumup_hosted_url:
return redirect(order.sumup_hosted_url)
redirect_url = url_for("checkout.checkout_return", order_id=order.id, _external=True)
sumup_cfg = config().get("sumup", {}) or {}
webhook_secret = sumup_cfg.get("webhook_secret")
webhook_url = url_for("checkout.checkout_webhook", order_id=order.id, _external=True)
if webhook_secret:
from urllib.parse import urlencode
sep = "&" if "?" in webhook_url else "?"
webhook_url = f"{webhook_url}{sep}{urlencode({'token': webhook_secret})}"
checkout_data = await sumup_create_checkout(
order,
redirect_url=redirect_url,
webhook_url=webhook_url,
)
order.sumup_checkout_id = checkout_data.get("id")
order.sumup_status = checkout_data.get("status")
hosted_cfg = checkout_data.get("hosted_checkout") or {}
hosted_url = hosted_cfg.get("hosted_checkout_url") or checkout_data.get("hosted_checkout_url")
order.sumup_hosted_url = hosted_url
await g.s.flush()
if not hosted_url:
from shared.sx.page import get_template_context
from sx.sx_components import render_checkout_error_page
tctx = await get_template_context()
html = await render_checkout_error_page(tctx, error="No hosted checkout URL returned from SumUp when trying to reopen payment.", order=order)
return await make_response(html, 500)
return redirect(hosted_url)
@bp.post("/recheck/")
async def order_recheck(order_id: int):
"""Manually re-check this order's status with SumUp."""
owner = _owner_filter()
if owner is None:
return await make_response("Order not found", 404)
result = await g.s.execute(select(Order).where(Order.id == order_id, owner))
order = result.scalar_one_or_none()
if not order:
return await make_response("Order not found", 404)
if not order.sumup_checkout_id:
return redirect(url_for("orders.defpage_order_detail", order_id=order.id))
try:
await check_sumup_status(g.s, order)
except Exception:
pass
return redirect(url_for("orders.defpage_order_detail", order_id=order.id))
return bp