feat: initialize cart app with blueprints, templates, and CI
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled

Extract cart, order, and orders blueprints with their service layer,
templates, Dockerfile (APP_MODULE=app:app, IMAGE=cart), entrypoint,
and Gitea CI workflow from the coop monolith.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
giles
2026-02-09 23:17:41 +00:00
commit 967303093d
46 changed files with 2472 additions and 0 deletions

148
bp/cart/api.py Normal file
View File

@@ -0,0 +1,148 @@
"""
Internal JSON API for the cart app.
These endpoints are called by other apps (coop, market) over HTTP.
They are CSRF-exempt because they are server-to-server calls.
"""
from __future__ import annotations
from quart import Blueprint, g, request, jsonify
from sqlalchemy import select, update, func
from sqlalchemy.orm import selectinload
from models.market import CartItem
from models.calendars import CalendarEntry
from suma_browser.app.csrf import csrf_exempt
from shared.cart_identity import current_cart_identity
def register() -> Blueprint:
bp = Blueprint("cart_api", __name__, url_prefix="/internal/cart")
@bp.get("/summary")
@csrf_exempt
async def summary():
"""
Return a lightweight cart summary (count + total) for the
current session/user. Called by coop and market apps to
populate the cart-mini widget without importing cart services.
"""
ident = current_cart_identity()
# --- product cart ---
cart_filters = [CartItem.deleted_at.is_(None)]
if ident["user_id"] is not None:
cart_filters.append(CartItem.user_id == ident["user_id"])
else:
cart_filters.append(CartItem.session_id == ident["session_id"])
result = await g.s.execute(
select(CartItem)
.where(*cart_filters)
.options(selectinload(CartItem.product))
.order_by(CartItem.created_at.desc())
)
cart_items = result.scalars().all()
cart_count = sum(ci.quantity for ci in cart_items)
cart_total = sum(
(ci.product.special_price or ci.product.regular_price or 0) * ci.quantity
for ci in cart_items
if ci.product and (ci.product.special_price or ci.product.regular_price)
)
# --- calendar entries ---
cal_filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if ident["user_id"] is not None:
cal_filters.append(CalendarEntry.user_id == ident["user_id"])
else:
cal_filters.append(CalendarEntry.session_id == ident["session_id"])
cal_result = await g.s.execute(
select(CalendarEntry).where(*cal_filters)
)
cal_entries = cal_result.scalars().all()
calendar_count = len(cal_entries)
calendar_total = sum((e.cost or 0) for e in cal_entries if e.cost is not None)
items = [
{
"slug": ci.product.slug if ci.product else None,
"title": ci.product.title if ci.product else None,
"image": ci.product.image if ci.product else None,
"quantity": ci.quantity,
"price": float(ci.product.special_price or ci.product.regular_price or 0)
if ci.product
else 0,
}
for ci in cart_items
]
return jsonify(
{
"count": cart_count,
"total": float(cart_total),
"calendar_count": calendar_count,
"calendar_total": float(calendar_total),
"items": items,
}
)
@bp.post("/adopt")
@csrf_exempt
async def adopt():
"""
Adopt anonymous cart items + calendar entries for a user.
Called by the coop app after successful login.
Body: {"user_id": int, "session_id": str}
"""
data = await request.get_json() or {}
user_id = data.get("user_id")
session_id = data.get("session_id")
if not user_id or not session_id:
return jsonify({"ok": False, "error": "user_id and session_id required"}), 400
# --- adopt cart items ---
anon_result = await g.s.execute(
select(CartItem).where(
CartItem.deleted_at.is_(None),
CartItem.user_id.is_(None),
CartItem.session_id == session_id,
)
)
anon_items = anon_result.scalars().all()
if anon_items:
# Soft-delete existing user cart
await g.s.execute(
update(CartItem)
.where(CartItem.deleted_at.is_(None), CartItem.user_id == user_id)
.values(deleted_at=func.now())
)
for ci in anon_items:
ci.user_id = user_id
# --- adopt calendar entries ---
await g.s.execute(
update(CalendarEntry)
.where(CalendarEntry.deleted_at.is_(None), CalendarEntry.user_id == user_id)
.values(deleted_at=func.now())
)
cal_result = await g.s.execute(
select(CalendarEntry).where(
CalendarEntry.deleted_at.is_(None),
CalendarEntry.session_id == session_id,
)
)
for entry in cal_result.scalars().all():
entry.user_id = user_id
return jsonify({"ok": True})
return bp

57
bp/cart/login_helper.py Normal file
View File

@@ -0,0 +1,57 @@
# app/cart_merge.py
from __future__ import annotations
from quart import g, session as qsession
from sqlalchemy import select
from typing import Optional
from models.market import CartItem
async def merge_anonymous_cart_into_user(user_id: int) -> None:
"""
When a user logs in, move any anonymous cart (session_id) items onto their user_id.
"""
sid: Optional[str] = qsession.get("cart_sid")
if not sid:
return
# get all anon cart items for this session
anon_items = (
await g.s.execute(
select(CartItem).where(
CartItem.deleted_at.is_(None),
CartItem.session_id == sid,
)
)
).scalars().all()
if not anon_items:
return
# Existing user items keyed by product_id for quick merge
user_items_by_product = {
ci.product_id: ci
for ci in (
await g.s.execute(
select(CartItem).where(
CartItem.deleted_at.is_(None),
CartItem.user_id == user_id,
)
)
).scalars().all()
}
for anon in anon_items:
existing = user_items_by_product.get(anon.product_id)
if existing:
# merge quantities then soft-delete the anon row
existing.quantity += anon.quantity
anon.deleted_at = func.now()
else:
# reassign anonymous cart row to this user
anon.user_id = user_id
anon.session_id = None
# clear the anonymous session id now that it's "claimed"
qsession.pop("cart_sid", None)

236
bp/cart/routes.py Normal file
View File

@@ -0,0 +1,236 @@
# app/bp/cart/routes.py
from __future__ import annotations
from quart import Blueprint, g, request, render_template, redirect, url_for, make_response
from sqlalchemy import select, update
from sqlalchemy.orm import selectinload
from models.market import Product, CartItem
from models.order import Order, OrderItem
from suma_browser.app.payments.sumup import create_checkout as sumup_create_checkout
from .services import (
current_cart_identity,
get_cart,
total,
clear_cart_for_order,
get_calendar_cart_entries, # NEW
calendar_total, # NEW
check_sumup_status
)
from .services.checkout import (
find_or_create_cart_item,
create_order_from_cart,
build_sumup_description,
build_sumup_reference,
build_webhook_url,
validate_webhook_secret,
get_order_with_details,
)
from config import config
from models.calendars import CalendarEntry # NEW
from suma_browser.app.utils.htmx import is_htmx_request
def register(url_prefix: str) -> Blueprint:
bp = Blueprint("cart", __name__, url_prefix=url_prefix)
# NOTE: load_cart moved to shared/cart_loader.py
# and registered in shared/factory.py as an app-level before_request
#@bp.context_processor
#async def inject_root():
# return {
# "total": total,
# "calendar_total": calendar_total, # NEW helper
#
# }
@bp.get("/")
async def view_cart():
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template(
"_types/cart/index.html",
)
else:
html = await render_template(
"_types/cart/_oob_elements.html",
)
return await make_response(html)
@bp.post("/add/<int:product_id>/")
async def add_to_cart(product_id: int):
ident = current_cart_identity()
cart_item = await find_or_create_cart_item(
g.s,
product_id,
ident["user_id"],
ident["session_id"],
)
if not cart_item:
return await make_response("Product not found", 404)
# htmx support (optional)
if request.headers.get("HX-Request") == "true":
return await view_cart()
# normal POST: go to cart page
return redirect(url_for("cart.view_cart"))
@bp.post("/checkout/")
async def checkout():
"""Create an Order from the current cart and redirect to SumUp Hosted Checkout."""
# Build cart
cart = await get_cart(g.s)
calendar_entries = await get_calendar_cart_entries(g.s)
if not cart and not calendar_entries:
return redirect(url_for("cart.view_cart"))
product_total = total(cart) or 0
calendar_amount = calendar_total(calendar_entries) or 0
cart_total = product_total + calendar_amount
if cart_total <= 0:
return redirect(url_for("cart.view_cart"))
# Create order from cart
ident = current_cart_identity()
order = await create_order_from_cart(
g.s,
cart,
calendar_entries,
ident.get("user_id"),
ident.get("session_id"),
product_total,
calendar_amount,
)
# Build SumUp checkout details
redirect_url = url_for("cart.checkout_return", order_id=order.id, _external=True)
order.sumup_reference = build_sumup_reference(order.id)
description = build_sumup_description(cart, order.id)
webhook_base_url = url_for("cart.checkout_webhook", order_id=order.id, _external=True)
webhook_url = build_webhook_url(webhook_base_url)
checkout_data = await sumup_create_checkout(
order,
redirect_url=redirect_url,
webhook_url=webhook_url,
description=description,
)
await clear_cart_for_order(g.s, order)
order.sumup_checkout_id = checkout_data.get("id")
order.sumup_status = checkout_data.get("status")
order.description = checkout_data.get("description")
hosted_cfg = checkout_data.get("hosted_checkout") or {}
hosted_url = hosted_cfg.get("hosted_checkout_url") or checkout_data.get("hosted_checkout_url")
order.sumup_hosted_url = hosted_url
await g.s.flush()
if not hosted_url:
html = await render_template(
"_types/cart/checkout_error.html",
order=order,
error="No hosted checkout URL returned from SumUp.",
)
return await make_response(html, 500)
return redirect(hosted_url)
@bp.post("/checkout/webhook/<int:order_id>/")
async def checkout_webhook(order_id: int):
"""
Webhook endpoint for SumUp CHECKOUT_STATUS_CHANGED events.
Security:
- Optional shared secret in ?token=... (checked against config sumup.webhook_secret)
- We *always* verify the event by calling SumUp's API.
"""
# Optional shared secret check
if not validate_webhook_secret(request.args.get("token")):
return "", 204
try:
payload = await request.get_json()
except Exception:
payload = None
if not isinstance(payload, dict):
return "", 204
if payload.get("event_type") != "CHECKOUT_STATUS_CHANGED":
return "", 204
checkout_id = payload.get("id")
if not checkout_id:
return "", 204
# Look up our order
result = await g.s.execute(select(Order).where(Order.id == order_id))
order = result.scalar_one_or_none()
if not order:
return "", 204
# Make sure the checkout id matches the one we stored
if order.sumup_checkout_id and order.sumup_checkout_id != checkout_id:
return "", 204
# Verify with SumUp
try:
await check_sumup_status(g.s, order)
except Exception:
pass
return "", 204
@bp.get("/checkout/return/<int:order_id>/")
async def checkout_return(order_id: int):
"""Handle the browser returning from SumUp after payment."""
order = await get_order_with_details(g.s, order_id)
if not order:
html = await render_template(
"_types/cart/checkout_return.html",
order=None,
status="missing",
calendar_entries=[],
)
return await make_response(html)
status = (order.status or "pending").lower()
# Optionally refresh status from SumUp
if order.sumup_checkout_id:
try:
await check_sumup_status(g.s, order)
except Exception:
status = status or "pending"
calendar_entries = order.calendar_entries or []
await g.s.flush()
html = await render_template(
"_types/cart/checkout_return.html",
order=order,
status=status,
calendar_entries=calendar_entries,
)
return await make_response(html)
return bp

View File

@@ -0,0 +1,8 @@
from .get_cart import get_cart
from .identity import current_cart_identity
from .total import total
from .clear_cart_for_order import clear_cart_for_order
from .adopt_session_cart_for_user import adopt_session_cart_for_user
from .calendar_cart import get_calendar_cart_entries, calendar_total
from .check_sumup_status import check_sumup_status

View File

@@ -0,0 +1,46 @@
from sqlalchemy import select, update, func
from models.market import CartItem
async def adopt_session_cart_for_user(session, user_id: int, session_id: str | None) -> None:
"""
When a user logs in or registers:
- If there are cart items for this anonymous session, take them over.
- Replace any existing cart items for this user with the anonymous cart.
"""
if not session_id:
return
# 1) Find anonymous cart items for this session
result = await session.execute(
select(CartItem)
.where(
CartItem.deleted_at.is_(None),
CartItem.user_id.is_(None),
CartItem.session_id == session_id,
)
)
anon_items = result.scalars().all()
if not anon_items:
# nothing to adopt
return
# 2) Soft-delete any existing cart for this user
await session.execute(
update(CartItem)
.where(
CartItem.deleted_at.is_(None),
CartItem.user_id == user_id,
)
.values(deleted_at=func.now())
)
# 3) Reassign anonymous cart items to the user
for ci in anon_items:
ci.user_id = user_id
# optional: you can keep the session_id as well, but user_id will take precedence
# ci.session_id = session_id
# No explicit commit here; caller's transaction will handle it

View File

@@ -0,0 +1,46 @@
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from models.calendars import CalendarEntry
from .identity import current_cart_identity
async def get_calendar_cart_entries(session):
"""
Return all *pending* calendar entries for the current cart identity
(user or anonymous session).
"""
ident = current_cart_identity()
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if ident["user_id"] is not None:
filters.append(CalendarEntry.user_id == ident["user_id"])
else:
filters.append(CalendarEntry.session_id == ident["session_id"])
result = await session.execute(
select(CalendarEntry)
.where(*filters)
.order_by(CalendarEntry.start_at.asc())
.options(
selectinload(CalendarEntry.calendar),
)
)
return result.scalars().all()
def calendar_total(entries) -> float:
"""
Total cost of pending calendar entries.
"""
return sum(
(e.cost or 0)
for e in entries
if e.cost is not None
)

View File

@@ -0,0 +1,35 @@
from suma_browser.app.payments.sumup import get_checkout as sumup_get_checkout
from sqlalchemy import update
from models.calendars import CalendarEntry # NEW
async def check_sumup_status(session, order):
checkout_data = await sumup_get_checkout(order.sumup_checkout_id)
order.sumup_status = checkout_data.get("status") or order.sumup_status
sumup_status = (order.sumup_status or "").upper()
if sumup_status == "PAID":
if order.status != "paid":
order.status = "paid"
filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "ordered",
CalendarEntry.order_id==order.id,
]
if order.user_id is not None:
filters.append(CalendarEntry.user_id == order.user_id)
elif order.session_id is not None:
filters.append(CalendarEntry.session_id == order.session_id)
await session.execute(
update(CalendarEntry)
.where(*filters)
.values(state="provisional")
)
# also clear cart for this user/session if it wasn't already
elif sumup_status == "FAILED":
order.status = "failed"
else:
order.status = sumup_status.lower() or order.status
await g.s.flush()

View File

@@ -0,0 +1,182 @@
from __future__ import annotations
from typing import Optional
from urllib.parse import urlencode
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from models.market import Product, CartItem
from models.order import Order, OrderItem
from models.calendars import CalendarEntry
from config import config
async def find_or_create_cart_item(
session: AsyncSession,
product_id: int,
user_id: Optional[int],
session_id: Optional[str],
) -> Optional[CartItem]:
"""
Find an existing cart item for this product/identity, or create a new one.
Returns None if the product doesn't exist.
Increments quantity if item already exists.
"""
# Make sure product exists
product = await session.scalar(
select(Product).where(Product.id == product_id)
)
if not product:
return None
# Look for existing cart item
filters = [
CartItem.deleted_at.is_(None),
CartItem.product_id == product_id,
]
if user_id is not None:
filters.append(CartItem.user_id == user_id)
else:
filters.append(CartItem.session_id == session_id)
existing = await session.scalar(select(CartItem).where(*filters))
if existing:
existing.quantity += 1
return existing
else:
cart_item = CartItem(
user_id=user_id,
session_id=session_id,
product_id=product.id,
quantity=1,
)
session.add(cart_item)
return cart_item
async def create_order_from_cart(
session: AsyncSession,
cart: list[CartItem],
calendar_entries: list[CalendarEntry],
user_id: Optional[int],
session_id: Optional[str],
product_total: float,
calendar_total: float,
) -> Order:
"""
Create an Order and OrderItems from the current cart + calendar entries.
Returns the created Order.
"""
cart_total = product_total + calendar_total
# Determine currency from first product
first_product = cart[0].product if cart else None
currency = (first_product.regular_price_currency if first_product else None) or "GBP"
# Create order
order = Order(
user_id=user_id,
session_id=session_id,
status="pending",
currency=currency,
total_amount=cart_total,
)
session.add(order)
await session.flush()
# Create order items from cart
for ci in cart:
price = ci.product.special_price or ci.product.regular_price or 0
oi = OrderItem(
order=order,
product_id=ci.product.id,
product_title=ci.product.title,
quantity=ci.quantity,
unit_price=price,
currency=currency,
)
session.add(oi)
# Update calendar entries to reference this order
calendar_filters = [
CalendarEntry.deleted_at.is_(None),
CalendarEntry.state == "pending",
]
if order.user_id is not None:
calendar_filters.append(CalendarEntry.user_id == order.user_id)
elif order.session_id is not None:
calendar_filters.append(CalendarEntry.session_id == order.session_id)
await session.execute(
update(CalendarEntry)
.where(*calendar_filters)
.values(
state="ordered",
order_id=order.id,
)
)
return order
def build_sumup_description(cart: list[CartItem], order_id: int) -> str:
"""Build a human-readable description for SumUp checkout."""
titles = [ci.product.title for ci in cart if ci.product and ci.product.title]
item_count = sum(ci.quantity for ci in cart)
if titles:
if len(titles) <= 3:
summary = ", ".join(titles)
else:
summary = ", ".join(titles[:3]) + f" + {len(titles) - 3} more"
else:
summary = "order items"
return f"Order {order_id} ({item_count} item{'s' if item_count != 1 else ''}): {summary}"
def build_sumup_reference(order_id: int) -> str:
"""Build a SumUp reference with configured prefix."""
sumup_cfg = config().get("sumup", {}) or {}
prefix = sumup_cfg.get("checkout_reference_prefix", "")
return f"{prefix}{order_id}"
def build_webhook_url(base_url: str) -> str:
"""Add webhook secret token to URL if configured."""
sumup_cfg = config().get("sumup", {}) or {}
webhook_secret = sumup_cfg.get("webhook_secret")
if webhook_secret:
sep = "&" if "?" in base_url else "?"
return f"{base_url}{sep}{urlencode({'token': webhook_secret})}"
return base_url
def validate_webhook_secret(token: Optional[str]) -> bool:
"""Validate webhook token against configured secret."""
sumup_cfg = config().get("sumup", {}) or {}
webhook_secret = sumup_cfg.get("webhook_secret")
if not webhook_secret:
return True # No secret configured, allow all
return token is not None and token == webhook_secret
async def get_order_with_details(session: AsyncSession, order_id: int) -> Optional[Order]:
"""Fetch an order with items and calendar entries eagerly loaded."""
result = await session.execute(
select(Order)
.options(
selectinload(Order.items).selectinload(OrderItem.product),
selectinload(Order.calendar_entries),
)
.where(Order.id == order_id)
)
return result.scalar_one_or_none()

View File

@@ -0,0 +1,27 @@
from sqlalchemy import update, func
from models.market import CartItem
from models.order import Order
# ...
# helper function near the top of the file (outside register())
async def clear_cart_for_order(session, order: Order) -> None:
"""
Soft-delete all CartItem rows belonging to this order's user_id/session_id.
Called when an order is marked as paid.
"""
filters = [CartItem.deleted_at.is_(None)]
if order.user_id is not None:
filters.append(CartItem.user_id == order.user_id)
if order.session_id is not None:
filters.append(CartItem.session_id == order.session_id)
if len(filters) == 1:
# no user_id/session_id on order nothing to clear
return
await session.execute(
update(CartItem)
.where(*filters)
.values(deleted_at=func.now())
)

View File

@@ -0,0 +1,24 @@
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from models.market import CartItem
from .identity import current_cart_identity
async def get_cart(session):
ident = current_cart_identity()
filters = [CartItem.deleted_at.is_(None)]
if ident["user_id"] is not None:
filters.append(CartItem.user_id == ident["user_id"])
else:
filters.append(CartItem.session_id == ident["session_id"])
result = await session.execute(
select(CartItem)
.where(*filters)
.order_by(CartItem.created_at.desc())
.options(
selectinload(CartItem.product), # <-- important bit
)
)
return result.scalars().all()

View File

@@ -0,0 +1,4 @@
# Re-export from canonical shared location
from shared.cart_identity import CartIdentity, current_cart_identity
__all__ = ["CartIdentity", "current_cart_identity"]

View File

@@ -0,0 +1,7 @@
def total(cart):
return sum(
(item.product.special_price or item.product.regular_price) * item.quantity
for item in cart
if (item.product.special_price or item.product.regular_price) is not None
)

74
bp/order/filters/qs.py Normal file
View File

@@ -0,0 +1,74 @@
# suma_browser/app/bp/order/filters/qs.py
from quart import request
from typing import Iterable, Optional, Union
from suma_browser.app.filters.qs_base import KEEP, build_qs
from suma_browser.app.filters.query_types import OrderQuery
def decode() -> OrderQuery:
"""
Decode current query string into an OrderQuery(page, search).
"""
try:
page = int(request.args.get("page", 1) or 1)
except ValueError:
page = 1
search = request.args.get("search") or None
return OrderQuery(page, search)
def makeqs_factory():
"""
Build a makeqs(...) that starts from the current filters + page.
Behaviour:
- If filters change and you don't explicitly pass page,
the page is reset to 1 (same pattern as browse/blog).
- You can clear search with search=None.
"""
q = decode()
base_search = q.search or None
base_page = int(q.page or 1)
def makeqs(
*,
clear_filters: bool = False,
search: Union[str, None, object] = KEEP,
page: Union[int, None, object] = None,
extra: Optional[Iterable[tuple]] = None,
leading_q: bool = True,
) -> str:
filters_changed = False
# --- search logic ---
if search is KEEP and not clear_filters:
final_search = base_search
else:
filters_changed = True
final_search = (search or None)
# --- page logic ---
if page is None:
final_page = 1 if filters_changed else base_page
else:
final_page = page
# --- build params ---
params: list[tuple[str, str]] = []
if final_search:
params.append(("search", final_search))
if final_page is not None:
params.append(("page", str(final_page)))
if extra:
for k, v in extra:
if v is not None:
params.append((k, str(v)))
return build_qs(params, leading_q=leading_q)
return makeqs

137
bp/order/routes.py Normal file
View File

@@ -0,0 +1,137 @@
from __future__ import annotations
from quart import Blueprint, g, render_template, redirect, url_for, make_response
from sqlalchemy import select, func, or_, cast, String, exists
from sqlalchemy.orm import selectinload
from models.market import Product
from models.order import Order, OrderItem
from suma_browser.app.payments.sumup import create_checkout as sumup_create_checkout
from config import config
from shared.http_utils import vary as _vary, current_url_without_page as _current_url_without_page
from suma_browser.app.bp.cart.services import check_sumup_status
from suma_browser.app.utils.htmx import is_htmx_request
from .filters.qs import makeqs_factory, decode
def register() -> Blueprint:
bp = Blueprint("order", __name__, url_prefix='/<int:order_id>')
ORDERS_PER_PAGE = 10 # keep in sync with browse page size / your preference
@bp.before_request
def route():
# this is the crucial bit for the |qs filter
g.makeqs_factory = makeqs_factory
@bp.get("/")
async def order_detail(order_id: int):
"""
Show a single order + items.
"""
result = await g.s.execute(
select(Order)
.options(
selectinload(Order.items).selectinload(OrderItem.product)
)
.where(Order.id == order_id)
)
order = result.scalar_one_or_none()
if not order:
return await make_response("Order not found", 404)
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/order/index.html", order=order,)
else:
# HTMX navigation (page 1): main panel + OOB elements
html = await render_template("_types/order/_oob_elements.html", order=order,)
return await make_response(html)
@bp.get("/pay/")
async def order_pay(order_id: int):
"""
Re-open the SumUp payment page for this order.
If already paid, just go back to the order detail.
If not, (re)create a SumUp checkout and redirect.
"""
result = await g.s.execute(select(Order).where(Order.id == order_id))
order = result.scalar_one_or_none()
if not order:
return await make_response("Order not found", 404)
if order.status == "paid":
# Already paid; nothing to pay
return redirect(url_for("orders.order.order_detail", order_id=order.id))
# Prefer to reuse existing hosted URL if we have one
if order.sumup_hosted_url:
return redirect(order.sumup_hosted_url)
# Otherwise, create a fresh checkout for this order
redirect_url = url_for("cart.checkout_return", order_id=order.id, _external=True)
sumup_cfg = config().get("sumup", {}) or {}
webhook_secret = sumup_cfg.get("webhook_secret")
webhook_url = url_for("cart.checkout_webhook", order_id=order.id, _external=True)
if webhook_secret:
from urllib.parse import urlencode
sep = "&" if "?" in webhook_url else "?"
webhook_url = f"{webhook_url}{sep}{urlencode({'token': webhook_secret})}"
checkout_data = await sumup_create_checkout(
order,
redirect_url=redirect_url,
webhook_url=webhook_url,
)
order.sumup_checkout_id = checkout_data.get("id")
order.sumup_status = checkout_data.get("status")
hosted_cfg = checkout_data.get("hosted_checkout") or {}
hosted_url = hosted_cfg.get("hosted_checkout_url") or checkout_data.get("hosted_checkout_url")
order.sumup_hosted_url = hosted_url
await g.s.flush()
if not hosted_url:
html = await render_template(
"_types/cart/checkout_error.html",
order=order,
error="No hosted checkout URL returned from SumUp when trying to reopen payment.",
)
return await make_response(html, 500)
return redirect(hosted_url)
@bp.post("/recheck/")
async def order_recheck(order_id: int):
"""
Manually re-check this order's status with SumUp.
Useful if the webhook hasn't fired or the user didn't return correctly.
"""
result = await g.s.execute(select(Order).where(Order.id == order_id))
order = result.scalar_one_or_none()
if not order:
return await make_response("Order not found", 404)
# If we don't have a checkout ID yet, nothing to query
if not order.sumup_checkout_id:
return redirect(url_for("orders.order.order_detail", order_id=order.id))
try:
await check_sumup_status(g.s, order)
except Exception:
# In a real app, log the error; here we just fall back to previous status
pass
return redirect(url_for("orders.order.order_detail", order_id=order.id))
return bp

77
bp/orders/filters/qs.py Normal file
View File

@@ -0,0 +1,77 @@
# suma_browser/app/bp/orders/filters/qs.py
from quart import request
from typing import Iterable, Optional, Union
from suma_browser.app.filters.qs_base import KEEP, build_qs
from suma_browser.app.filters.query_types import OrderQuery
def decode() -> OrderQuery:
"""
Decode current query string into an OrderQuery(page, search).
"""
try:
page = int(request.args.get("page", 1) or 1)
except ValueError:
page = 1
search = request.args.get("search") or None
return OrderQuery(page, search)
def makeqs_factory():
"""
Build a makeqs(...) that starts from the current filters + page.
Behaviour:
- If filters change and you don't explicitly pass page,
the page is reset to 1 (same pattern as browse/blog).
- You can clear search with search=None.
"""
q = decode()
base_search = q.search or None
base_page = int(q.page or 1)
def makeqs(
*,
clear_filters: bool = False,
search: Union[str, None, object] = KEEP,
page: Union[int, None, object] = None,
extra: Optional[Iterable[tuple]] = None,
leading_q: bool = True,
) -> str:
filters_changed = False
# --- search logic ---
if search is KEEP and not clear_filters:
final_search = base_search
else:
filters_changed = True
if search is KEEP:
final_search = None
else:
final_search = (search or None)
# --- page logic ---
if page is None:
final_page = 1 if filters_changed else base_page
else:
final_page = page
# --- build params ---
params: list[tuple[str, str]] = []
if final_search:
params.append(("search", final_search))
if final_page is not None:
params.append(("page", str(final_page)))
if extra:
for k, v in extra:
if v is not None:
params.append((k, str(v)))
return build_qs(params, leading_q=leading_q)
return makeqs

139
bp/orders/routes.py Normal file
View File

@@ -0,0 +1,139 @@
from __future__ import annotations
from quart import Blueprint, g, render_template, redirect, url_for, make_response
from sqlalchemy import select, func, or_, cast, String, exists
from sqlalchemy.orm import selectinload
from models.market import Product
from models.order import Order, OrderItem
from suma_browser.app.payments.sumup import create_checkout as sumup_create_checkout
from config import config
from shared.http_utils import vary as _vary, current_url_without_page as _current_url_without_page
from suma_browser.app.bp.cart.services import check_sumup_status
from suma_browser.app.utils.htmx import is_htmx_request
from suma_browser.app.bp import register_order
from .filters.qs import makeqs_factory, decode
def register(url_prefix: str) -> Blueprint:
bp = Blueprint("orders", __name__, url_prefix=url_prefix)
bp.register_blueprint(
register_order(),
)
ORDERS_PER_PAGE = 10 # keep in sync with browse page size / your preference
@bp.before_request
def route():
# this is the crucial bit for the |qs filter
g.makeqs_factory = makeqs_factory
@bp.get("/")
async def list_orders():
# --- decode filters from query string (page + search) ---
q = decode()
page, search = q.page, q.search
# sanity clamp page
if page < 1:
page = 1
# --- build where clause for search ---
where_clause = None
if search:
term = f"%{search.strip()}%"
conditions = [
Order.status.ilike(term),
Order.currency.ilike(term),
Order.sumup_checkout_id.ilike(term),
Order.sumup_status.ilike(term),
Order.description.ilike(term),
]
conditions.append(
exists(
select(1)
.select_from(OrderItem)
.join(Product, Product.id == OrderItem.product_id)
.where(
OrderItem.order_id == Order.id,
or_(
OrderItem.product_title.ilike(term),
Product.title.ilike(term),
Product.description_short.ilike(term),
Product.description_html.ilike(term),
Product.slug.ilike(term),
Product.brand.ilike(term),
),
)
)
)
# allow exact ID match or partial (string) match
try:
search_id = int(search)
except (TypeError, ValueError):
search_id = None
if search_id is not None:
conditions.append(Order.id == search_id)
else:
conditions.append(cast(Order.id, String).ilike(term))
where_clause = or_(*conditions)
# --- total count & total pages (respecting search) ---
count_stmt = select(func.count()).select_from(Order)
if where_clause is not None:
count_stmt = count_stmt.where(where_clause)
total_count_result = await g.s.execute(count_stmt)
total_count = total_count_result.scalar_one() or 0
total_pages = max(1, (total_count + ORDERS_PER_PAGE - 1) // ORDERS_PER_PAGE)
# clamp page if beyond range (just in case)
if page > total_pages:
page = total_pages
# --- paginated orders (respecting search) ---
offset = (page - 1) * ORDERS_PER_PAGE
stmt = (
select(Order)
.order_by(Order.created_at.desc())
.offset(offset)
.limit(ORDERS_PER_PAGE)
)
if where_clause is not None:
stmt = stmt.where(where_clause)
result = await g.s.execute(stmt)
orders = result.scalars().all()
context = {
"orders": orders,
"page": page,
"total_pages": total_pages,
"search": search,
"search_count": total_count, # For search display
}
# Determine which template to use based on request type and pagination
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/orders/index.html", **context)
elif page > 1:
# HTMX pagination: just table rows + sentinel
html = await render_template("_types/orders/_rows.html", **context)
else:
# HTMX navigation (page 1): main panel + OOB elements
html = await render_template("_types/orders/_oob_elements.html", **context)
resp = await make_response(html)
resp.headers["Hx-Push-Url"] = _current_url_without_page()
return _vary(resp)
return bp