All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 39s
Templates generate URLs without trailing slash via market_url(), but the route required one. POST requests don't get redirected by Werkzeug, causing a 405 MethodNotAllowed. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
251 lines
8.2 KiB
Python
251 lines
8.2 KiB
Python
from __future__ import annotations
|
|
|
|
from quart import (
|
|
g,
|
|
Blueprint,
|
|
abort,
|
|
redirect,
|
|
render_template,
|
|
make_response,
|
|
)
|
|
from sqlalchemy import select, func, update
|
|
|
|
from models.market import Product, ProductLike
|
|
from ..browse.services.slugs import canonical_html_slug
|
|
from ..browse.services.blacklist.product import is_product_blocked
|
|
from ..browse.services import db_backend as cb
|
|
from ..browse.services import _massage_product
|
|
from utils import host_url
|
|
from suma_browser.app.redis_cacher import cache_page, clear_cache
|
|
from ..cart.services import total
|
|
from .services.product_operations import toggle_product_like, massage_full_product
|
|
|
|
|
|
def register():
|
|
bp = Blueprint("product", __name__, url_prefix="/product/<slug>")
|
|
@bp.url_value_preprocessor
|
|
def pull_blog(endpoint, values):
|
|
g.product_slug = values.get("slug")
|
|
|
|
# ─────────────────────────────────────────────────────────────
|
|
# BEFORE REQUEST: Slug or numeric ID resolver
|
|
# ─────────────────────────────────────────────────────────────
|
|
@bp.before_request
|
|
async def resolve_product():
|
|
raw_slug = g.product_slug = getattr(g, "product_slug", None)
|
|
if raw_slug is None:
|
|
return
|
|
|
|
# 1. If slug is INT → load product by ID
|
|
if raw_slug.isdigit():
|
|
product_id = int(raw_slug)
|
|
|
|
product = await cb.db_product_full_id(
|
|
g.s, product_id, user_id=g.user.id if g.user else 0
|
|
)
|
|
|
|
if not product:
|
|
abort(404)
|
|
|
|
# If product is deleted → SHOW as-is
|
|
if product["deleted_at"]:
|
|
d = product
|
|
g.item_data = {"d": d, "slug": product["slug"], "liked": False}
|
|
return
|
|
|
|
# Not deleted → redirect to canonical slug
|
|
canon = canonical_html_slug(product["slug"])
|
|
return redirect(
|
|
host_url(url_for("market.browse.product.product_detail", slug=canon))
|
|
)
|
|
|
|
# 2. Normal slug-based behaviour
|
|
if is_product_blocked(raw_slug):
|
|
abort(404)
|
|
|
|
canon = canonical_html_slug(raw_slug)
|
|
if canon != raw_slug:
|
|
return redirect(
|
|
host_url(url_for("product.product_detail", slug=canon))
|
|
)
|
|
|
|
# hydrate full product
|
|
d = await cb.db_product_full(
|
|
g.s, canon, user_id=g.user.id if g.user else 0
|
|
)
|
|
if not d:
|
|
abort(404)
|
|
g.item_data = {"d": d, "slug": canon, "liked": d["is_liked"]}
|
|
|
|
@bp.context_processor
|
|
def context():
|
|
item_data = getattr(g, "item_data", None)
|
|
|
|
if item_data:
|
|
return {
|
|
**item_data,
|
|
}
|
|
else:
|
|
return {}
|
|
|
|
# ─────────────────────────────────────────────────────────────
|
|
# RENDER PRODUCT
|
|
# ─────────────────────────────────────────────────────────────
|
|
@bp.get("/")
|
|
@cache_page(tag="browse")
|
|
async def product_detail(slug: str):
|
|
from suma_browser.app.utils.htmx import is_htmx_request
|
|
|
|
# Determine which template to use based on request type
|
|
if not is_htmx_request():
|
|
# Normal browser request: full page with layout
|
|
html = await render_template("_types/product/index.html")
|
|
else:
|
|
# HTMX request: main panel + OOB elements
|
|
html = await render_template("_types/product/_oob_elements.html")
|
|
|
|
return html
|
|
|
|
@bp.post("/like/toggle/")
|
|
@clear_cache(tag="browse", tag_scope="user")
|
|
async def like_toggle(slug):
|
|
# Use slug from URL parameter (set by url_prefix="/product/<slug>")
|
|
product_slug = slug
|
|
|
|
if not g.user:
|
|
html = await render_template(
|
|
"_types/browse/like/button.html",
|
|
slug=product_slug,
|
|
liked=False,
|
|
)
|
|
resp = make_response(html, 403)
|
|
return resp
|
|
|
|
user_id = g.user.id
|
|
|
|
liked, error = await toggle_product_like(g.s, user_id, product_slug)
|
|
|
|
if error:
|
|
resp = make_response(error, 404)
|
|
return resp
|
|
|
|
html = await render_template(
|
|
"_types/browse/like/button.html",
|
|
slug=product_slug,
|
|
liked=liked,
|
|
)
|
|
return html
|
|
|
|
|
|
|
|
@bp.get("/admin/")
|
|
async def admin(slug: str):
|
|
from suma_browser.app.utils.htmx import is_htmx_request
|
|
|
|
if not is_htmx_request():
|
|
# Normal browser request: full page with layout
|
|
html = await render_template("_types/product/admin/index.html")
|
|
else:
|
|
# HTMX request: main panel + OOB elements
|
|
html = await render_template("_types/product/admin/_oob_elements.html")
|
|
|
|
return await make_response(html)
|
|
|
|
|
|
from suma_browser.app.bp.cart.services.identity import current_cart_identity
|
|
#from suma_browser.app.bp.cart.routes import view_cart
|
|
from models.market import CartItem
|
|
from quart import request, url_for
|
|
|
|
@bp.post("/cart")
|
|
@clear_cache(tag="browse", tag_scope="user")
|
|
async def cart(slug: str):
|
|
# make sure product exists (we *allow* deleted_at != None later if you want)
|
|
product_id = await g.s.scalar(
|
|
select(Product.id).where(
|
|
Product.slug == slug,
|
|
Product.deleted_at.is_(None),
|
|
)
|
|
)
|
|
|
|
product = await g.s.scalar(
|
|
select(Product).where(Product.id == product_id)
|
|
)
|
|
if not product:
|
|
return await make_response("Product not found", 404)
|
|
|
|
# --- NEW: read `count` from body (JSON or form), default to 1 ---
|
|
count = 1
|
|
try:
|
|
if request.is_json:
|
|
data = await request.get_json()
|
|
if data is not None and "count" in data:
|
|
count = int(data["count"])
|
|
else:
|
|
form = await request.form
|
|
if "count" in form:
|
|
count = int(form["count"])
|
|
except (ValueError, TypeError):
|
|
# if parsing fails, just fall back to 1
|
|
count = 1
|
|
# --- END NEW ---
|
|
|
|
ident = current_cart_identity()
|
|
|
|
filters = [CartItem.deleted_at.is_(None), CartItem.product_id == product_id]
|
|
if ident["user_id"] is not None:
|
|
filters.append(CartItem.user_id == ident["user_id"])
|
|
else:
|
|
filters.append(CartItem.session_id == ident["session_id"])
|
|
|
|
ci = next(
|
|
(item for item in g.cart if item.product_id == product_id),
|
|
None,
|
|
)
|
|
|
|
# --- NEW: set quantity based on `count` ---
|
|
if ci:
|
|
if count > 0:
|
|
ci.quantity = count
|
|
else:
|
|
# count <= 0 → remove from cart entirely
|
|
ci.quantity=0
|
|
g.cart.remove(ci)
|
|
await g.s.delete(ci)
|
|
|
|
else:
|
|
if count > 0:
|
|
ci = CartItem(
|
|
user_id=ident["user_id"],
|
|
session_id=ident["session_id"],
|
|
product_id=product.id,
|
|
product=product,
|
|
quantity=count,
|
|
market_place_id=getattr(g, "market", None) and g.market.id,
|
|
)
|
|
g.cart.append(ci)
|
|
g.s.add(ci)
|
|
# if count <= 0 and no existing item, do nothing
|
|
# --- END NEW ---
|
|
|
|
# no explicit commit; your session middleware should handle it
|
|
|
|
# htmx support (optional)
|
|
if request.headers.get("HX-Request") == "true":
|
|
# You can return a small fragment or mini-cart here
|
|
|
|
return await render_template(
|
|
"_types/product/_added.html",
|
|
cart=g.cart,
|
|
item=ci,
|
|
total = total
|
|
)
|
|
|
|
# normal POST: go to cart page
|
|
from shared.urls import cart_url
|
|
return redirect(cart_url("/"))
|
|
|
|
|
|
|
|
return bp
|