Phase 1-3 of decoupling: - path_setup.py adds project root to sys.path - Market-owned models in market/models/ (market, market_place) - All imports updated: shared.infrastructure, shared.db, shared.browser, etc. - MarketPlace uses container_type/container_id instead of post_id FK Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
368 lines
10 KiB
Python
368 lines
10 KiB
Python
from __future__ import annotations
|
|
import os, json
|
|
from typing import List, Optional
|
|
from shared.config import config
|
|
from .blacklist.product import is_product_blocked
|
|
|
|
|
|
def _json(path: str):
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
return json.load(f)
|
|
|
|
|
|
def fs_nav():
|
|
path = os.path.join(config()["cache"]["fs_root"], "nav.json")
|
|
return _json(path)
|
|
|
|
|
|
def _brand_of(item: dict) -> str:
|
|
b = (item.get("brand") or "").strip()
|
|
if b:
|
|
return b
|
|
try:
|
|
return (item.get("info_table", {}).get("Brand") or "").strip()
|
|
except Exception:
|
|
return ""
|
|
|
|
|
|
def _stickers_of(item: dict) -> List[str]:
|
|
vals = item.get("stickers") or []
|
|
out = []
|
|
for v in vals:
|
|
s = (str(v) or "").strip().lower()
|
|
if s:
|
|
out.append(s)
|
|
return out
|
|
|
|
|
|
def fs_product_by_slug(slug: str):
|
|
slug = (slug or "").strip()
|
|
if slug.endswith(".json"):
|
|
path = os.path.join(config()["cache"]["fs_root"], "products", slug)
|
|
else:
|
|
path = os.path.join(config()["cache"]["fs_root"], "products", f"{slug}.json")
|
|
return _json(path)
|
|
|
|
|
|
def fs_count_products_in_sub(top_slug: str, sub_slug: Optional[str]) -> int:
|
|
"""
|
|
Return how many products are in the listing for (top_slug, sub_slug),
|
|
after filtering out blocked products.
|
|
|
|
If sub_slug is None, that's the top-level category listing.
|
|
"""
|
|
fs_root = config()["cache"]["fs_root"]
|
|
|
|
# Build path to listings/.../items.json just like fs_products does
|
|
parts = ["listings", top_slug]
|
|
if sub_slug:
|
|
parts.append(sub_slug)
|
|
parts.append("items.json")
|
|
|
|
path = os.path.join(fs_root, *parts)
|
|
if not os.path.exists(path):
|
|
return 0
|
|
|
|
try:
|
|
all_slugs = _json(path)
|
|
except Exception:
|
|
return 0
|
|
|
|
# Filter out blocked products
|
|
allowed = [
|
|
slug for slug in all_slugs
|
|
if not is_product_blocked(slug)
|
|
]
|
|
return len(allowed)
|
|
|
|
|
|
def fs_products(
|
|
top_slug: str | None,
|
|
sub_slug: str | None,
|
|
selected_brands: Optional[List[str]] = None,
|
|
selected_stickers: Optional[List[str]] = None,
|
|
selected_labels: Optional[List[str]] = None,
|
|
page: int = 1,
|
|
search: Optional[str] = None,
|
|
sort: Optional[str] = None,
|
|
page_size: int = 20,
|
|
|
|
# NEW: only include products the current user has liked
|
|
liked_slugs: Optional[List[str]] = None,
|
|
liked: bool = None,
|
|
):
|
|
"""
|
|
Returns:
|
|
{
|
|
"total_pages": int,
|
|
"items": [product dict ...], # filtered + paginated (sorted)
|
|
"brands": [{"name": str, "count": int}],
|
|
"stickers": [{"name": str, "count": int}],
|
|
"labels": [{"name": str, "count": int}],
|
|
}
|
|
|
|
Filters:
|
|
- top_slug / sub_slug scope
|
|
- selected_brands
|
|
- selected_stickers
|
|
- selected_labels
|
|
- search
|
|
- liked_slugs (if provided)
|
|
"""
|
|
|
|
import os
|
|
from typing import List, Dict
|
|
|
|
fs_root = config()["cache"]["fs_root"]
|
|
|
|
# ---------- Collect slugs ----------
|
|
slugs: List[str] = []
|
|
if top_slug: # normal listing path
|
|
parts = ["listings", top_slug]
|
|
if sub_slug:
|
|
parts.append(sub_slug)
|
|
parts.append("items.json")
|
|
path = os.path.join(fs_root, *parts)
|
|
if os.path.exists(path):
|
|
try:
|
|
slugs = [s for s in _json(path) if not is_product_blocked(s)]
|
|
except Exception:
|
|
slugs = []
|
|
else:
|
|
# No top slug: include ALL products from /products/*.json
|
|
products_dir = os.path.join(fs_root, "products")
|
|
try:
|
|
for fname in os.listdir(products_dir):
|
|
if not fname.endswith(".json"):
|
|
continue
|
|
slug = fname[:-5] # strip .json
|
|
if not is_product_blocked(slug):
|
|
slugs.append(slug)
|
|
except FileNotFoundError:
|
|
slugs = []
|
|
|
|
# ---------- Load product dicts ----------
|
|
all_items: List[dict] = []
|
|
for slug in slugs:
|
|
try:
|
|
item = fs_product_by_slug(slug)
|
|
if isinstance(item, dict):
|
|
all_items.append(item)
|
|
except Exception:
|
|
continue
|
|
|
|
# Stable deterministic ordering when aggregating everything (name ASC)
|
|
def _title_key(it: dict) -> tuple:
|
|
title = (it.get("title") or it.get("name") or it.get("slug") or "").strip().lower()
|
|
return (title, it.get("slug") or "")
|
|
|
|
all_items.sort(key=_title_key)
|
|
|
|
# ---------- Helpers for filters & counts ----------
|
|
def _brand_of_local(item: dict) -> str:
|
|
b = item.get("brand") or (item.get("info_table") or {}).get("Brand")
|
|
return (b or "").strip()
|
|
|
|
def _stickers_of_local(item: dict) -> List[str]:
|
|
vals = item.get("stickers") or []
|
|
out = []
|
|
for s in vals:
|
|
if isinstance(s, str):
|
|
s2 = s.strip().lower()
|
|
if s2:
|
|
out.append(s2)
|
|
return out
|
|
|
|
def _labels_of_local(item: dict) -> List[str]:
|
|
vals = item.get("labels") or []
|
|
out = []
|
|
for s in vals:
|
|
if isinstance(s, str):
|
|
s2 = s.strip().lower()
|
|
if s2:
|
|
out.append(s2)
|
|
return out
|
|
|
|
sel_brands = [
|
|
(s or "").strip().lower()
|
|
for s in (selected_brands or [])
|
|
if (s or "").strip()
|
|
]
|
|
sel_stickers = [
|
|
(s or "").strip().lower()
|
|
for s in (selected_stickers or [])
|
|
if (s or "").strip()
|
|
]
|
|
sel_labels = [
|
|
(s or "").strip().lower()
|
|
for s in (selected_labels or [])
|
|
if (s or "").strip()
|
|
]
|
|
search_q = (search or "").strip().lower() or None
|
|
|
|
liked_set = {
|
|
(slug or "").strip().lower()
|
|
for slug in (liked_slugs or [] if liked else [])
|
|
if (slug or "").strip()
|
|
}
|
|
|
|
real_liked_set = {
|
|
(slug or "").strip().lower()
|
|
for slug in (liked_slugs or [])
|
|
if (slug or "").strip()
|
|
}
|
|
|
|
def matches_brand(item: dict) -> bool:
|
|
if not sel_brands:
|
|
return True
|
|
return _brand_of_local(item).strip().lower() in sel_brands
|
|
|
|
def has_all_selected_stickers(item: dict) -> bool:
|
|
if not sel_stickers:
|
|
return True
|
|
tags = set(_stickers_of_local(item))
|
|
return all(s in tags for s in sel_stickers)
|
|
|
|
def has_all_selected_labels(item: dict) -> bool:
|
|
if not sel_labels:
|
|
return True
|
|
tags = set(_labels_of_local(item))
|
|
return all(s in tags for s in sel_labels)
|
|
|
|
def matches_search(item: dict) -> bool:
|
|
if not search_q:
|
|
return True
|
|
desc = (item.get("description_short") or "").strip().lower()
|
|
return search_q in desc
|
|
|
|
def is_liked(item: dict) -> bool:
|
|
"""
|
|
True if this item should be shown under the liked filter.
|
|
If liked_set is empty, treat everything as allowed.
|
|
"""
|
|
slug_val = (item.get("slug") or "").strip().lower()
|
|
return slug_val in real_liked_set
|
|
|
|
# ---------- Counts (dependent on other filters + search + liked) ----------
|
|
brand_counts: Dict[str, int] = {}
|
|
for b in (selected_brands or []):
|
|
brand_counts[b] = 0
|
|
|
|
for it in all_items:
|
|
b = _brand_of_local(it)
|
|
if not b:
|
|
continue
|
|
brand_counts[b] = brand_counts.get(b, 0) + 1
|
|
|
|
sticker_counts: Dict[str, int] = {}
|
|
for s in (selected_stickers or []):
|
|
sticker_counts[s] = 0
|
|
for it in all_items:
|
|
for s in _stickers_of_local(it):
|
|
sticker_counts[s] = sticker_counts.get(s, 0) + 1
|
|
|
|
label_counts: Dict[str, int] = {}
|
|
for s in (selected_labels or []):
|
|
label_counts[s] = 0
|
|
for it in all_items:
|
|
for s in _labels_of_local(it):
|
|
label_counts[s] = label_counts.get(s, 0) + 1
|
|
|
|
liked_count = 0
|
|
for it in all_items:
|
|
if is_liked(it):
|
|
liked_count += 1
|
|
|
|
search_count=0
|
|
for it in all_items:
|
|
if matches_search(it):
|
|
search_count += 1
|
|
|
|
|
|
# ---------- Apply filters ----------
|
|
filtered = [
|
|
it
|
|
for it in all_items
|
|
if matches_brand(it)
|
|
and has_all_selected_stickers(it)
|
|
and has_all_selected_labels(it)
|
|
and matches_search(it)
|
|
and (not liked or is_liked(it))
|
|
]
|
|
|
|
# ---------- Sorting ----------
|
|
sort_mode = (sort or "az").strip().lower()
|
|
|
|
def _price_key(item: dict):
|
|
p = item["regular_price"]
|
|
title, slug = _title_key(item)
|
|
return (0 if p is not None else 1, p if p is not None else 0, title, slug)
|
|
|
|
def _price_key_desc(item: dict):
|
|
p = item["regular_price"]
|
|
title, slug = _title_key(item)
|
|
return (
|
|
0 if p is not None else 1,
|
|
-(p if p is not None else 0),
|
|
title,
|
|
slug,
|
|
)
|
|
|
|
if sort_mode in ("az",):
|
|
filtered.sort(key=_title_key)
|
|
elif sort_mode in ("za",):
|
|
filtered.sort(key=_title_key, reverse=True)
|
|
elif sort_mode in (
|
|
"price-asc", "price_asc", "price-low", "price-low-high", "low-high", "lo-hi"
|
|
):
|
|
filtered.sort(key=_price_key)
|
|
elif sort_mode in (
|
|
"price-desc", "price_desc", "price-high", "price-high-low", "high-low", "hi-lo"
|
|
):
|
|
filtered.sort(key=_price_key_desc)
|
|
else:
|
|
filtered.sort(key=_title_key)
|
|
|
|
# ---------- Pagination ----------
|
|
total_pages = max(1, (len(filtered) + page_size - 1) // page_size)
|
|
page = max(1, page)
|
|
start = (page - 1) * page_size
|
|
end = start + page_size
|
|
page_items = filtered[start:end]
|
|
# ---------- Format counts lists ----------
|
|
brands_list = sorted(
|
|
[{"name": k, "count": v} for k, v in brand_counts.items()],
|
|
key=lambda x: (-x["count"], x["name"].lower()),
|
|
)
|
|
stickers_list = sorted(
|
|
[{"name": k, "count": v} for k, v in sticker_counts.items()],
|
|
key=lambda x: (-x["count"], x["name"]),
|
|
)
|
|
labels_list = sorted(
|
|
[{"name": k, "count": v} for k, v in label_counts.items()],
|
|
key=lambda x: (-x["count"], x["name"]),
|
|
)
|
|
return {
|
|
"total_pages": total_pages,
|
|
"items": page_items,
|
|
"brands": brands_list,
|
|
"stickers": stickers_list,
|
|
"labels": labels_list,
|
|
"liked_count": liked_count,
|
|
"search_count": search_count
|
|
}
|
|
|
|
# async wrappers (unchanged)
|
|
async def read_nav():
|
|
return fs_nav()
|
|
|
|
async def read_listing(top_slug: str, sub_slug: str | None, page: int):
|
|
return fs_products(top_slug, sub_slug, None, None, page)
|
|
|
|
async def read_product(slug_or_path: str):
|
|
slug = (slug_or_path or "").strip()
|
|
if "/" in slug:
|
|
slug = slug.rsplit("/", 1)[-1]
|
|
slug = slug.split("?", 1)[0]
|
|
return fs_product_by_slug(slug)
|