from __future__ import annotations import math import re from typing import Callable, Dict, List, Optional, Tuple from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse from .http_client import fetch from bp.browse.services.slugs import product_slug_from_href from bp.browse.services.state import ( KNOWN_PRODUCT_SLUGS, _listing_page_cache, _listing_page_ttl, _listing_variant_cache, _listing_variant_ttl, now, ) from shared.utils import normalize_text, soup_of from shared.config import config def parse_total_pages_from_text(text: str) -> Optional[int]: m = re.search(r"Showing\s+(\d+)\s+of\s+(\d+)", text, re.I) if not m: return None shown = int(m.group(1)) total = int(m.group(2)) per_page = 36 if shown in (12, 24, 36) else shown return max(1, math.ceil(total / per_page)) def _first_from_srcset(val: str) -> Optional[str]: if not val: return None first = val.split(",")[0].strip() parts = first.split() return parts[0] if parts else first def _abs_url(u: Optional[str]) -> Optional[str]: if not u: return None return urljoin(config()["base_url"], u) if isinstance(u, str) and u.startswith("/") else u def _collect_img_candidates(el) -> List[str]: urls: List[str] = [] if not el: return urls attrs = ["src", "data-src", "data-original", "data-zoom-image", "data-thumb", "content", "href"] for a in attrs: v = el.get(a) if v: urls.append(v) for a in ["srcset", "data-srcset"]: v = el.get(a) if v: first = _first_from_srcset(v) if first: urls.append(first) return urls def _dedupe_preserve_order_by(seq: List[str], key: Callable[[str], str]) -> List[str]: seen = set() out: List[str] = [] for s in seq: if not s: continue k = key(s) if k in seen: continue seen.add(k) out.append(s) return out def _filename_key(u: str) -> str: p = urlparse(u) path = p.path or "" if path.endswith("/"): path = path[:-1] last = path.split("/")[-1] return f"{p.netloc}:{last}".lower() def _parse_cards_from_soup(soup) -> List[Dict]: """Extract product tiles (name, href, image, desc) from listing soup. De-duplicate by slug to avoid doubles from overlapping selectors.""" items: List[str] = [] seen_slugs: set[str] = set() # Primary selectors (Magento 2 default) card_wrappers = soup.select( "li.product-item, .product-item, ol.products.list.items li, .products.list.items li, .product-item-info" ) for card in card_wrappers: a = ( card.select_one("a.product-item-link") or card.select_one(".product-item-name a") or card.select_one("a[href$='.html'], a[href$='.htm']") ) if not a: continue #name = normalize_text(a.get_text()) or normalize_text(a.get("title") or "") href = a.get("href") #if not name or not href: # continue if href.startswith("/"): href = urljoin(config()["base_url"], href) slug = product_slug_from_href(href) KNOWN_PRODUCT_SLUGS.add(slug) if slug and slug not in seen_slugs: seen_slugs.add(slug) items.append(slug) # Secondary: any product-looking anchors inside products container if not items: products_container = soup.select_one(".products") or soup for a in products_container.select("a[href$='.html'], a[href$='.htm']"): href = a.get("href") if href.startswith("/"): href = urljoin(config()["base_url"], href) slug = product_slug_from_href(href) KNOWN_PRODUCT_SLUGS.add(slug) if slug not in seen_slugs: seen_slugs.add(slug) items.append(slug) # Tertiary: JSON-LD fallback (ItemList/Product) if not items: import json def add_product(name: Optional[str], url: Optional[str], image: Optional[str]): if not url: return absu = urljoin(config()["base_url"], url) if url.startswith("/") else url slug = product_slug_from_href(absu) if not slug: return KNOWN_PRODUCT_SLUGS.add(slug) if slug not in seen_slugs: seen_slugs.add(slug) items.append(slug) for script in soup.find_all("script", attrs={"type": "application/ld+json"}): #try: data = json.loads(script.get_text()) #except Exception: # continue if isinstance(data, dict): if data.get("@type") == "ItemList" and isinstance(data.get("itemListElement"), list): for it in data["itemListElement"]: if isinstance(it, dict): ent = it.get("item") or it if isinstance(ent, dict): add_product( ent.get("name"), ent.get("url"), (ent.get("image") if isinstance(ent.get("image"), str) else None), ) if data.get("@type") == "Product": add_product( data.get("name"), data.get("url"), (data.get("image") if isinstance(data.get("image"), str) else None), ) elif isinstance(data, list): for ent in data: if not isinstance(ent, dict): continue if ent.get("@type") == "Product": add_product( ent.get("name"), ent.get("url"), (ent.get("image") if isinstance(ent.get("image"), str) else None), ) if ent.get("@type") == "ItemList": for it in ent.get("itemListElement", []): if isinstance(it, dict): obj = it.get("item") or it if isinstance(obj, dict): add_product( obj.get("name"), obj.get("url"), (obj.get("image") if isinstance(obj.get("image"), str) else None), ) return items def _with_query(url: str, add: Dict[str, str]) -> str: p = urlparse(url) q = dict(parse_qsl(p.query, keep_blank_values=True)) q.update(add) new_q = urlencode(q) return urlunparse((p.scheme, p.netloc, p.path, p.params, new_q, p.fragment)) def _with_page(url: str, page: int) -> str: if page and page > 1: return _with_query(url, {"p": str(page)}) return url def _listing_base_key(url: str) -> str: p = urlparse(url) path = p.path.rstrip("/") return f"{p.scheme}://{p.netloc}{path}".lower() def _variant_cache_get(base_key: str) -> Optional[str]: info = _listing_variant_cache.get(base_key) if not info: return None url, ts = info if (now() - ts) > _listing_variant_ttl: _listing_variant_cache.pop(base_key, None) return None return url def _variant_cache_set(base_key: str, working_url: str) -> None: _listing_variant_cache[base_key] = (working_url, now()) def _page_cache_get(working_url: str, page: int) -> Optional[Tuple[List[Dict], int]]: key = f"{working_url}|p={page}" info = _listing_page_cache.get(key) if not info: return None (items, total_pages), ts = info if (now() - ts) > _listing_page_ttl: _listing_page_cache.pop(key, None) return None return items, total_pages def _page_cache_set(working_url: str, page: int, items: List[Dict], total_pages: int) -> None: key = f"{working_url}|p={page}" _listing_page_cache[key] = ((items, total_pages), now()) async def _fetch_parse(url: str, page: int): html = await fetch(_with_page(url, page)) soup = soup_of(html) items = _parse_cards_from_soup(soup) return items, soup async def scrape_products(list_url: str, page: int = 1): """Fast listing fetch with variant memoization + page cache.""" _listing_base_key(list_url) items, soup = await _fetch_parse(list_url, page) total_pages = _derive_total_pages(soup) return items, total_pages def _derive_total_pages(soup) -> int: total_pages = 1 textdump = normalize_text(soup.get_text(" ")) pages_from_text = parse_total_pages_from_text(textdump) if pages_from_text: total_pages = pages_from_text else: pages = {1} for a in soup.find_all("a", href=True): m = re.search(r"[?&]p=(\d+)", a["href"]) if m: pages.add(int(m.group(1))) total_pages = max(pages) if pages else 1 return total_pages def _slugs_from_list_url(list_url: str) -> Tuple[str, Optional[str]]: p = urlparse(list_url) parts = [x for x in (p.path or "").split("/") if x] top = parts[0].lower() if parts else "" sub = None if len(parts) >= 2: sub = parts[1] if sub.lower().endswith((".html", ".htm")): sub = re.sub(r"\.(html?|HTML?)$", "", sub) return top, sub