This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
market/bp/browse/services/services.py
giles 9b2687b039
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 41s
feat: restructure market app with per-market URL scoping
- URL structure changes from /<route> to /<market_slug>/<route>
- Root / shows markets listing page
- app.py: url_value_preprocessor, url_defaults, hydrate_market (events app pattern)
- Browse queries (db_nav, db_products_nocounts, db_products_counts) accept market_id
- _productInfo reads g.market.id to scope all queries
- save_nav accepts market_id, sets on new NavTop rows
- API save_nav passes g.market.id
- Scraper default URLs point to /suma-market/ on port 8001

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-10 18:08:48 +00:00

186 lines
5.4 KiB
Python

from __future__ import annotations
from urllib.parse import urljoin
from quart import (
g,
request,
)
from config import config
from .products import products, products_nocounts
from .blacklist.product_details import is_blacklisted_heading
from utils import host_url
from sqlalchemy import select
from models import ProductLike
from ...market.filters.qs import decode
def _hx_fragment_request() -> bool:
return request.headers.get("HX-Request", "").lower() == "true"
async def _productInfo(top_slug=None, sub_slug=None):
"""
Shared query logic for home / category / subcategory pages.
Pulls filters from qs.decode(), queries products(), and orders brands/stickers/etc.
"""
q = decode()
page, search, sort = q.page, q.search, q.sort
selected_brands, selected_stickers, selected_labels = q.selected_brands, q.selected_stickers, q.selected_labels
liked = q.liked
# Get market_id from hydrated market context
market = getattr(g, "market", None)
market_id = market.id if market else None
if top_slug is not None and sub_slug is not None:
list_url = urljoin(config()["base_url"], f"/{top_slug}/{sub_slug}")
else:
if top_slug is not None:
list_url = top_slug
else:
list_url = ""
if not _hx_fragment_request() or page==1:
items, brands, stickers, labels, total_pages, liked_count, search_count = await products(
list_url,
selected_brands=selected_brands,
selected_stickers=selected_stickers,
selected_labels=selected_labels,
page=page,
search=search,
sort=sort,
user_id=g.user.id if g.user else None,
liked = liked,
market_id=market_id,
)
brands_ordered = _order_brands_selected_first(brands, selected_brands)
return {
"products": items,
"page": page,
"search": search,
"sort": sort,
"total_pages": int(total_pages or 1),
"brands": brands_ordered,
"selected_brands": selected_brands,
"stickers": stickers,
"selected_stickers": selected_stickers,
"labels": labels,
"selected_labels": selected_labels,
"liked": liked,
"liked_count": liked_count,
"search_count": search_count
}
else:
items, total_pages = await products_nocounts(
g.s,
list_url,
selected_brands=selected_brands,
selected_stickers=selected_stickers,
selected_labels=selected_labels,
page=page,
search=search,
sort=sort,
user_id=g.user.id if g.user else None,
liked = liked,
market_id=market_id,
)
return {
"products": items,
"page": page,
"search": search,
"sort": sort,
"total_pages": int(total_pages or 1),
}
def _order_brands_selected_first(brands, selected):
"""Return brands with the selected brand(s) first."""
if not brands or not selected:
return brands
sel = [(s or "").strip() for s in selected]
head = [s for s in brands if (s.get("name") or "").strip() in sel]
tail = [s for s in brands if (s.get("name") or "").strip() not in sel]
return head + tail
def _order_stickers_selected_first(
stickers: list[dict], selected_stickers: list[str] | None
):
if not stickers or not selected_stickers:
return stickers
sel = [(s or "").strip().lower() for s in selected_stickers]
head = [s for s in stickers if (s.get("name") or "").strip().lower() in sel]
tail = [
s
for s in stickers
if (s.get("name") or "").strip().lower() not in sel
]
return head + tail
def _order_labels_selected_first(
labels: list[dict], selected_labels: list[str] | None
):
if not labels or not selected_labels:
return labels
sel = [(s or "").strip().lower() for s in selected_labels]
head = [s for s in labels if (s.get("name") or "").strip().lower() in sel]
tail = [
s
for s in labels
if (s.get("name") or "").strip().lower() not in sel
]
return head + tail
def _massage_product(d):
"""
Normalise the product dict for templates:
- inject APP_ROOT into HTML
- drop blacklisted sections
"""
massaged = {
**d,
"description_html": d["description_html"].replace(
"[**__APP_ROOT__**]", g.root
),
"sections": [
{
**section,
"html": section["html"].replace(
"[**__APP_ROOT__**]", g.root
),
}
for section in d["sections"]
if not is_blacklisted_heading(section["title"])
],
}
return massaged
# Re-export from canonical shared location
from shared.http_utils import vary as _vary, current_url_without_page as _current_url_without_page
async def _is_liked(user_id: int | None, slug: str) -> bool:
"""
Check if this user has liked this product.
"""
if not user_id:
return False
# because ProductLike has composite PK (user_id, product_slug),
# we can fetch it by primary key dict:
row = await g.s.execute(
select(ProductLike).where(
ProductLike.user_id == user_id,
ProductLike.product_slug == slug,
)
)
row.scalar_one_or_none()
return row is not None