This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
market/bp/product/routes.py
giles 291c829c7f
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 1m0s
Let resolve_product run fully for POST, just skip redirects
Instead of skipping resolve_product entirely for POST (leaving
g.item_data unset and breaking templates), run the full resolution
but suppress canonical-slug redirects. This sets g.item_data for
the context processor so d.slug and other template vars work.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-24 10:22:04 +00:00

286 lines
9.7 KiB
Python

from __future__ import annotations
from quart import (
g,
Blueprint,
abort,
redirect,
render_template,
make_response,
)
from sqlalchemy import select, func, update
from models.market import Product, ProductLike
from ..browse.services.slugs import canonical_html_slug
from ..browse.services.blacklist.product import is_product_blocked
from ..browse.services import db_backend as cb
from ..browse.services import _massage_product
from shared.utils import host_url
from shared.browser.app.redis_cacher import cache_page, clear_cache
from ..cart.services import total
from .services.product_operations import toggle_product_like, massage_full_product
def register():
bp = Blueprint("product", __name__, url_prefix="/product/<slug>")
@bp.url_value_preprocessor
def pull_blog(endpoint, values):
# App-level preprocessor pops both "slug" and "page_slug" into g.post_slug,
# with page_slug overwriting slug. We need the original product slug which
# the app preprocessor popped first — but it's gone. Instead, extract the
# product slug from the request path: .../product/<slug>/...
from quart import request as req
parts = req.path.rstrip("/").split("/")
try:
idx = parts.index("product")
g.product_slug = parts[idx + 1] if idx + 1 < len(parts) else None
except (ValueError, IndexError):
g.product_slug = None
# ─────────────────────────────────────────────────────────────
# BEFORE REQUEST: Slug or numeric ID resolver
# ─────────────────────────────────────────────────────────────
@bp.before_request
async def resolve_product():
from quart import request as req
raw_slug = g.product_slug = getattr(g, "product_slug", None)
if raw_slug is None:
return
is_post = req.method == "POST"
# 1. If slug is INT → load product by ID
if raw_slug.isdigit():
product_id = int(raw_slug)
product = await cb.db_product_full_id(
g.s, product_id, user_id=g.user.id if g.user else 0
)
if not product:
abort(404)
# If product is deleted → SHOW as-is
if product["deleted_at"]:
d = product
g.item_data = {"d": d, "slug": product["slug"], "liked": False}
return
# Not deleted → redirect to canonical slug (GET only)
if not is_post:
canon = canonical_html_slug(product["slug"])
return redirect(
host_url(url_for("market.browse.product.product_detail", slug=canon))
)
g.item_data = {"d": product, "slug": product["slug"], "liked": False}
return
# 2. Normal slug-based behaviour
if is_product_blocked(raw_slug):
abort(404)
canon = canonical_html_slug(raw_slug)
if canon != raw_slug and not is_post:
return redirect(
host_url(url_for("market.browse.product.product_detail", slug=canon))
)
# hydrate full product
d = await cb.db_product_full(
g.s, canon, user_id=g.user.id if g.user else 0
)
if not d:
abort(404)
g.item_data = {"d": d, "slug": canon, "liked": d.get("is_liked", False)}
@bp.context_processor
def context():
item_data = getattr(g, "item_data", None)
if item_data:
return {
**item_data,
}
else:
return {}
# ─────────────────────────────────────────────────────────────
# RENDER PRODUCT
# ─────────────────────────────────────────────────────────────
@bp.get("/")
@cache_page(tag="browse")
async def product_detail(slug: str):
from shared.browser.app.utils.htmx import is_htmx_request
# Determine which template to use based on request type
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/product/index.html")
else:
# HTMX request: main panel + OOB elements
html = await render_template("_types/product/_oob_elements.html")
return html
@bp.post("/like/toggle/")
@clear_cache(tag="browse", tag_scope="user")
async def like_toggle(slug):
# Use slug from URL parameter (set by url_prefix="/product/<slug>")
product_slug = slug
if not g.user:
html = await render_template(
"_types/browse/like/button.html",
slug=product_slug,
liked=False,
)
resp = make_response(html, 403)
return resp
user_id = g.user.id
liked, error = await toggle_product_like(g.s, user_id, product_slug)
if error:
resp = make_response(error, 404)
return resp
html = await render_template(
"_types/browse/like/button.html",
slug=product_slug,
liked=liked,
)
return html
@bp.get("/admin/")
async def admin(slug: str):
from shared.browser.app.utils.htmx import is_htmx_request
if not is_htmx_request():
# Normal browser request: full page with layout
html = await render_template("_types/product/admin/index.html")
else:
# HTMX request: main panel + OOB elements
html = await render_template("_types/product/admin/_oob_elements.html")
return await make_response(html)
from bp.cart.services.identity import current_cart_identity
#from bp.cart.routes import view_cart
from models.market import CartItem
from quart import request, url_for
@bp.post("/cart/")
@clear_cache(tag="browse", tag_scope="user")
async def cart(**_kw):
slug = g.product_slug
# make sure product exists (we *allow* deleted_at != None later if you want)
product_id = await g.s.scalar(
select(Product.id).where(
Product.slug == slug,
Product.deleted_at.is_(None),
)
)
product = await g.s.scalar(
select(Product).where(Product.id == product_id)
)
if not product:
return await make_response("Product not found", 404)
# --- NEW: read `count` from body (JSON or form), default to 1 ---
count = 1
try:
if request.is_json:
data = await request.get_json()
if data is not None and "count" in data:
count = int(data["count"])
else:
form = await request.form
if "count" in form:
count = int(form["count"])
except (ValueError, TypeError):
# if parsing fails, just fall back to 1
count = 1
# --- END NEW ---
ident = current_cart_identity()
# Load cart items for current user/session
from sqlalchemy.orm import selectinload
cart_filters = [CartItem.deleted_at.is_(None)]
if ident["user_id"] is not None:
cart_filters.append(CartItem.user_id == ident["user_id"])
else:
cart_filters.append(CartItem.session_id == ident["session_id"])
cart_result = await g.s.execute(
select(CartItem)
.where(*cart_filters)
.order_by(CartItem.created_at.desc())
.options(
selectinload(CartItem.product),
selectinload(CartItem.market_place),
)
)
g.cart = list(cart_result.scalars().all())
ci = next(
(item for item in g.cart if item.product_id == product_id),
None,
)
# --- NEW: set quantity based on `count` ---
if ci:
if count > 0:
ci.quantity = count
else:
# count <= 0 → remove from cart entirely
ci.quantity=0
g.cart.remove(ci)
await g.s.delete(ci)
else:
if count > 0:
ci = CartItem(
user_id=ident["user_id"],
session_id=ident["session_id"],
product_id=product.id,
product=product,
quantity=count,
market_place_id=getattr(g, "market", None) and g.market.id,
)
g.cart.append(ci)
g.s.add(ci)
# if count <= 0 and no existing item, do nothing
# --- END NEW ---
# no explicit commit; your session middleware should handle it
# htmx support (optional)
if request.headers.get("HX-Request") == "true":
# You can return a small fragment or mini-cart here
return await render_template(
"_types/product/_added.html",
cart=g.cart,
item=ci,
total=total,
cart_count=sum(i.quantity for i in g.cart),
cart_total=total(g.cart),
calendar_total=lambda entries: 0,
calendar_cart_entries=[],
)
# normal POST: go to cart page
from shared.infrastructure.urls import cart_url
return redirect(cart_url("/"))
return bp