All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 54s
The cart-mini fragment is fetched via HTTP from the cart app, which uses its own DB connection. Without committing first, the cart app sees stale data (no new item). Commit the transaction, start a new one so after_request can still commit cleanly. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
285 lines
9.7 KiB
Python
285 lines
9.7 KiB
Python
from __future__ import annotations
|
|
|
|
from quart import (
|
|
g,
|
|
Blueprint,
|
|
abort,
|
|
redirect,
|
|
render_template,
|
|
make_response,
|
|
)
|
|
from sqlalchemy import select, func, update
|
|
|
|
from models.market import Product, ProductLike
|
|
from ..browse.services.slugs import canonical_html_slug
|
|
from ..browse.services.blacklist.product import is_product_blocked
|
|
from ..browse.services import db_backend as cb
|
|
from ..browse.services import _massage_product
|
|
from shared.utils import host_url
|
|
from shared.browser.app.redis_cacher import cache_page, clear_cache
|
|
from ..cart.services import total
|
|
from .services.product_operations import toggle_product_like, massage_full_product
|
|
|
|
|
|
def register():
|
|
bp = Blueprint("product", __name__, url_prefix="/product/<product_slug>")
|
|
@bp.url_value_preprocessor
|
|
def pull_product_slug(endpoint, values):
|
|
# product_slug is distinct from the app-level "slug"/"page_slug" params,
|
|
# so it won't be popped by the app-level preprocessor in app.py.
|
|
g.product_slug = values.pop("product_slug", None)
|
|
|
|
# ─────────────────────────────────────────────────────────────
|
|
# BEFORE REQUEST: Slug or numeric ID resolver
|
|
# ─────────────────────────────────────────────────────────────
|
|
@bp.before_request
|
|
async def resolve_product():
|
|
from quart import request as req
|
|
|
|
raw_slug = g.product_slug = getattr(g, "product_slug", None)
|
|
if raw_slug is None:
|
|
return
|
|
|
|
is_post = req.method == "POST"
|
|
|
|
# 1. If slug is INT → load product by ID
|
|
if raw_slug.isdigit():
|
|
product_id = int(raw_slug)
|
|
|
|
product = await cb.db_product_full_id(
|
|
g.s, product_id, user_id=g.user.id if g.user else 0
|
|
)
|
|
|
|
if not product:
|
|
abort(404)
|
|
|
|
# If product is deleted → SHOW as-is
|
|
if product["deleted_at"]:
|
|
d = product
|
|
g.item_data = {"d": d, "slug": product["slug"], "liked": False}
|
|
return
|
|
|
|
# Not deleted → redirect to canonical slug (GET only)
|
|
if not is_post:
|
|
canon = canonical_html_slug(product["slug"])
|
|
return redirect(
|
|
host_url(url_for("market.browse.product.product_detail", product_slug=canon))
|
|
)
|
|
|
|
g.item_data = {"d": product, "slug": product["slug"], "liked": False}
|
|
return
|
|
|
|
# 2. Normal slug-based behaviour
|
|
if is_product_blocked(raw_slug):
|
|
abort(404)
|
|
|
|
canon = canonical_html_slug(raw_slug)
|
|
if canon != raw_slug and not is_post:
|
|
return redirect(
|
|
host_url(url_for("market.browse.product.product_detail", product_slug=canon))
|
|
)
|
|
|
|
# hydrate full product
|
|
d = await cb.db_product_full(
|
|
g.s, canon, user_id=g.user.id if g.user else 0
|
|
)
|
|
if not d:
|
|
abort(404)
|
|
g.item_data = {"d": d, "slug": canon, "liked": d.get("is_liked", False)}
|
|
|
|
@bp.context_processor
|
|
def context():
|
|
item_data = getattr(g, "item_data", None)
|
|
|
|
if item_data:
|
|
return {
|
|
**item_data,
|
|
}
|
|
else:
|
|
return {}
|
|
|
|
# ─────────────────────────────────────────────────────────────
|
|
# RENDER PRODUCT
|
|
# ─────────────────────────────────────────────────────────────
|
|
@bp.get("/")
|
|
@cache_page(tag="browse")
|
|
async def product_detail():
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
|
|
# Determine which template to use based on request type
|
|
if not is_htmx_request():
|
|
# Normal browser request: full page with layout
|
|
html = await render_template("_types/product/index.html")
|
|
else:
|
|
# HTMX request: main panel + OOB elements
|
|
html = await render_template("_types/product/_oob_elements.html")
|
|
|
|
return html
|
|
|
|
@bp.post("/like/toggle/")
|
|
@clear_cache(tag="browse", tag_scope="user")
|
|
async def like_toggle():
|
|
product_slug = g.product_slug
|
|
|
|
if not g.user:
|
|
html = await render_template(
|
|
"_types/browse/like/button.html",
|
|
slug=product_slug,
|
|
liked=False,
|
|
)
|
|
resp = make_response(html, 403)
|
|
return resp
|
|
|
|
user_id = g.user.id
|
|
|
|
liked, error = await toggle_product_like(g.s, user_id, product_slug)
|
|
|
|
if error:
|
|
resp = make_response(error, 404)
|
|
return resp
|
|
|
|
html = await render_template(
|
|
"_types/browse/like/button.html",
|
|
slug=product_slug,
|
|
liked=liked,
|
|
)
|
|
return html
|
|
|
|
|
|
|
|
@bp.get("/admin/")
|
|
async def admin():
|
|
from shared.browser.app.utils.htmx import is_htmx_request
|
|
|
|
if not is_htmx_request():
|
|
# Normal browser request: full page with layout
|
|
html = await render_template("_types/product/admin/index.html")
|
|
else:
|
|
# HTMX request: main panel + OOB elements
|
|
html = await render_template("_types/product/admin/_oob_elements.html")
|
|
|
|
return await make_response(html)
|
|
|
|
|
|
from bp.cart.services.identity import current_cart_identity
|
|
#from bp.cart.routes import view_cart
|
|
from models.market import CartItem
|
|
from quart import request, url_for
|
|
|
|
@bp.post("/cart/")
|
|
@clear_cache(tag="browse", tag_scope="user")
|
|
async def cart():
|
|
slug = g.product_slug
|
|
# make sure product exists (we *allow* deleted_at != None later if you want)
|
|
product_id = await g.s.scalar(
|
|
select(Product.id).where(
|
|
Product.slug == slug,
|
|
Product.deleted_at.is_(None),
|
|
)
|
|
)
|
|
|
|
product = await g.s.scalar(
|
|
select(Product).where(Product.id == product_id)
|
|
)
|
|
if not product:
|
|
return await make_response("Product not found", 404)
|
|
|
|
# --- NEW: read `count` from body (JSON or form), default to 1 ---
|
|
count = 1
|
|
try:
|
|
if request.is_json:
|
|
data = await request.get_json()
|
|
if data is not None and "count" in data:
|
|
count = int(data["count"])
|
|
else:
|
|
form = await request.form
|
|
if "count" in form:
|
|
count = int(form["count"])
|
|
except (ValueError, TypeError):
|
|
# if parsing fails, just fall back to 1
|
|
count = 1
|
|
# --- END NEW ---
|
|
|
|
ident = current_cart_identity()
|
|
|
|
# Load cart items for current user/session
|
|
from sqlalchemy.orm import selectinload
|
|
cart_filters = [CartItem.deleted_at.is_(None)]
|
|
if ident["user_id"] is not None:
|
|
cart_filters.append(CartItem.user_id == ident["user_id"])
|
|
else:
|
|
cart_filters.append(CartItem.session_id == ident["session_id"])
|
|
cart_result = await g.s.execute(
|
|
select(CartItem)
|
|
.where(*cart_filters)
|
|
.order_by(CartItem.created_at.desc())
|
|
.options(
|
|
selectinload(CartItem.product),
|
|
selectinload(CartItem.market_place),
|
|
)
|
|
)
|
|
g.cart = list(cart_result.scalars().all())
|
|
|
|
ci = next(
|
|
(item for item in g.cart if item.product_id == product_id),
|
|
None,
|
|
)
|
|
|
|
# --- NEW: set quantity based on `count` ---
|
|
if ci:
|
|
if count > 0:
|
|
ci.quantity = count
|
|
else:
|
|
# count <= 0 → remove from cart entirely
|
|
ci.quantity=0
|
|
g.cart.remove(ci)
|
|
await g.s.delete(ci)
|
|
|
|
else:
|
|
if count > 0:
|
|
ci = CartItem(
|
|
user_id=ident["user_id"],
|
|
session_id=ident["session_id"],
|
|
product_id=product.id,
|
|
product=product,
|
|
quantity=count,
|
|
market_place_id=getattr(g, "market", None) and g.market.id,
|
|
)
|
|
g.cart.append(ci)
|
|
g.s.add(ci)
|
|
# if count <= 0 and no existing item, do nothing
|
|
# --- END NEW ---
|
|
|
|
# no explicit commit; your session middleware should handle it
|
|
|
|
# htmx response: OOB-swap mini cart + product buttons
|
|
if request.headers.get("HX-Request") == "true":
|
|
# Commit so the cart app sees the updated data when we fetch the fragment
|
|
await g.tx.commit()
|
|
g.tx = await g.s.begin()
|
|
|
|
from shared.infrastructure.fragments import fetch_fragment
|
|
from shared.infrastructure.cart_identity import current_cart_identity as _ci
|
|
frag_ident = _ci()
|
|
cart_params = {}
|
|
if frag_ident["user_id"] is not None:
|
|
cart_params["user_id"] = frag_ident["user_id"]
|
|
if frag_ident["session_id"] is not None:
|
|
cart_params["session_id"] = frag_ident["session_id"]
|
|
cart_mini_html = await fetch_fragment("cart", "cart-mini", params=cart_params)
|
|
|
|
return await render_template(
|
|
"_types/product/_added.html",
|
|
cart_mini_html=cart_mini_html,
|
|
cart=g.cart,
|
|
item=ci,
|
|
)
|
|
|
|
# normal POST: go to cart page
|
|
from shared.infrastructure.urls import cart_url
|
|
return redirect(cart_url("/"))
|
|
|
|
|
|
|
|
return bp
|