Phase 1-3 of decoupling: - path_setup.py adds project root to sys.path - Market-owned models in market/models/ (market, market_place) - All imports updated: shared.infrastructure, shared.db, shared.browser, etc. - MarketPlace uses container_type/container_id instead of post_id FK Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
290 lines
9.4 KiB
Python
290 lines
9.4 KiB
Python
from __future__ import annotations
|
|
|
|
import math
|
|
import re
|
|
from typing import Callable, Dict, List, Optional, Tuple
|
|
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
|
|
|
|
|
|
from .http_client import fetch
|
|
from bp.browse.services.slugs import product_slug_from_href
|
|
from bp.browse.services.state import (
|
|
KNOWN_PRODUCT_SLUGS,
|
|
_listing_page_cache,
|
|
_listing_page_ttl,
|
|
_listing_variant_cache,
|
|
_listing_variant_ttl,
|
|
now,
|
|
)
|
|
from shared.utils import normalize_text, soup_of
|
|
from shared.config import config
|
|
|
|
|
|
def parse_total_pages_from_text(text: str) -> Optional[int]:
|
|
m = re.search(r"Showing\s+(\d+)\s+of\s+(\d+)", text, re.I)
|
|
if not m:
|
|
return None
|
|
shown = int(m.group(1))
|
|
total = int(m.group(2))
|
|
per_page = 36 if shown in (12, 24, 36) else shown
|
|
return max(1, math.ceil(total / per_page))
|
|
|
|
|
|
def _first_from_srcset(val: str) -> Optional[str]:
|
|
if not val:
|
|
return None
|
|
first = val.split(",")[0].strip()
|
|
parts = first.split()
|
|
return parts[0] if parts else first
|
|
|
|
|
|
def _abs_url(u: Optional[str]) -> Optional[str]:
|
|
if not u:
|
|
return None
|
|
return urljoin(config()["base_url"], u) if isinstance(u, str) and u.startswith("/") else u
|
|
|
|
|
|
def _collect_img_candidates(el) -> List[str]:
|
|
urls: List[str] = []
|
|
if not el:
|
|
return urls
|
|
attrs = ["src", "data-src", "data-original", "data-zoom-image", "data-thumb", "content", "href"]
|
|
for a in attrs:
|
|
v = el.get(a)
|
|
if v:
|
|
urls.append(v)
|
|
for a in ["srcset", "data-srcset"]:
|
|
v = el.get(a)
|
|
if v:
|
|
first = _first_from_srcset(v)
|
|
if first:
|
|
urls.append(first)
|
|
return urls
|
|
|
|
|
|
def _dedupe_preserve_order_by(seq: List[str], key: Callable[[str], str]) -> List[str]:
|
|
seen = set()
|
|
out: List[str] = []
|
|
for s in seq:
|
|
if not s:
|
|
continue
|
|
k = key(s)
|
|
if k in seen:
|
|
continue
|
|
seen.add(k)
|
|
out.append(s)
|
|
return out
|
|
|
|
|
|
def _filename_key(u: str) -> str:
|
|
p = urlparse(u)
|
|
path = p.path or ""
|
|
if path.endswith("/"):
|
|
path = path[:-1]
|
|
last = path.split("/")[-1]
|
|
return f"{p.netloc}:{last}".lower()
|
|
|
|
|
|
def _parse_cards_from_soup(soup) -> List[Dict]:
|
|
"""Extract product tiles (name, href, image, desc) from listing soup.
|
|
De-duplicate by slug to avoid doubles from overlapping selectors."""
|
|
items: List[str] = []
|
|
seen_slugs: set[str] = set()
|
|
|
|
# Primary selectors (Magento 2 default)
|
|
card_wrappers = soup.select(
|
|
"li.product-item, .product-item, ol.products.list.items li, .products.list.items li, .product-item-info"
|
|
)
|
|
for card in card_wrappers:
|
|
a = (
|
|
card.select_one("a.product-item-link")
|
|
or card.select_one(".product-item-name a")
|
|
or card.select_one("a[href$='.html'], a[href$='.htm']")
|
|
)
|
|
if not a:
|
|
continue
|
|
#name = normalize_text(a.get_text()) or normalize_text(a.get("title") or "")
|
|
href = a.get("href")
|
|
#if not name or not href:
|
|
# continue
|
|
if href.startswith("/"):
|
|
href = urljoin(config()["base_url"], href)
|
|
|
|
|
|
slug = product_slug_from_href(href)
|
|
KNOWN_PRODUCT_SLUGS.add(slug)
|
|
|
|
if slug and slug not in seen_slugs:
|
|
seen_slugs.add(slug)
|
|
items.append(slug)
|
|
# Secondary: any product-looking anchors inside products container
|
|
if not items:
|
|
products_container = soup.select_one(".products") or soup
|
|
for a in products_container.select("a[href$='.html'], a[href$='.htm']"):
|
|
href = a.get("href")
|
|
if href.startswith("/"):
|
|
href = urljoin(config()["base_url"], href)
|
|
slug = product_slug_from_href(href)
|
|
KNOWN_PRODUCT_SLUGS.add(slug)
|
|
if slug not in seen_slugs:
|
|
seen_slugs.add(slug)
|
|
items.append(slug)
|
|
|
|
# Tertiary: JSON-LD fallback (ItemList/Product)
|
|
if not items:
|
|
import json
|
|
|
|
def add_product(name: Optional[str], url: Optional[str], image: Optional[str]):
|
|
if not url:
|
|
return
|
|
absu = urljoin(config()["base_url"], url) if url.startswith("/") else url
|
|
slug = product_slug_from_href(absu)
|
|
if not slug:
|
|
return
|
|
KNOWN_PRODUCT_SLUGS.add(slug)
|
|
if slug not in seen_slugs:
|
|
seen_slugs.add(slug)
|
|
items.append(slug)
|
|
|
|
for script in soup.find_all("script", attrs={"type": "application/ld+json"}):
|
|
#try:
|
|
data = json.loads(script.get_text())
|
|
#except Exception:
|
|
# continue
|
|
if isinstance(data, dict):
|
|
if data.get("@type") == "ItemList" and isinstance(data.get("itemListElement"), list):
|
|
for it in data["itemListElement"]:
|
|
if isinstance(it, dict):
|
|
ent = it.get("item") or it
|
|
if isinstance(ent, dict):
|
|
add_product(
|
|
ent.get("name"),
|
|
ent.get("url"),
|
|
(ent.get("image") if isinstance(ent.get("image"), str) else None),
|
|
)
|
|
if data.get("@type") == "Product":
|
|
add_product(
|
|
data.get("name"),
|
|
data.get("url"),
|
|
(data.get("image") if isinstance(data.get("image"), str) else None),
|
|
)
|
|
elif isinstance(data, list):
|
|
for ent in data:
|
|
if not isinstance(ent, dict):
|
|
continue
|
|
if ent.get("@type") == "Product":
|
|
add_product(
|
|
ent.get("name"),
|
|
ent.get("url"),
|
|
(ent.get("image") if isinstance(ent.get("image"), str) else None),
|
|
)
|
|
if ent.get("@type") == "ItemList":
|
|
for it in ent.get("itemListElement", []):
|
|
if isinstance(it, dict):
|
|
obj = it.get("item") or it
|
|
if isinstance(obj, dict):
|
|
add_product(
|
|
obj.get("name"),
|
|
obj.get("url"),
|
|
(obj.get("image") if isinstance(obj.get("image"), str) else None),
|
|
)
|
|
|
|
return items
|
|
|
|
|
|
def _with_query(url: str, add: Dict[str, str]) -> str:
|
|
p = urlparse(url)
|
|
q = dict(parse_qsl(p.query, keep_blank_values=True))
|
|
q.update(add)
|
|
new_q = urlencode(q)
|
|
return urlunparse((p.scheme, p.netloc, p.path, p.params, new_q, p.fragment))
|
|
|
|
|
|
def _with_page(url: str, page: int) -> str:
|
|
if page and page > 1:
|
|
return _with_query(url, {"p": str(page)})
|
|
return url
|
|
|
|
|
|
def _listing_base_key(url: str) -> str:
|
|
p = urlparse(url)
|
|
path = p.path.rstrip("/")
|
|
return f"{p.scheme}://{p.netloc}{path}".lower()
|
|
|
|
|
|
def _variant_cache_get(base_key: str) -> Optional[str]:
|
|
info = _listing_variant_cache.get(base_key)
|
|
if not info:
|
|
return None
|
|
url, ts = info
|
|
if (now() - ts) > _listing_variant_ttl:
|
|
_listing_variant_cache.pop(base_key, None)
|
|
return None
|
|
return url
|
|
|
|
|
|
def _variant_cache_set(base_key: str, working_url: str) -> None:
|
|
_listing_variant_cache[base_key] = (working_url, now())
|
|
|
|
|
|
def _page_cache_get(working_url: str, page: int) -> Optional[Tuple[List[Dict], int]]:
|
|
key = f"{working_url}|p={page}"
|
|
info = _listing_page_cache.get(key)
|
|
if not info:
|
|
return None
|
|
(items, total_pages), ts = info
|
|
if (now() - ts) > _listing_page_ttl:
|
|
_listing_page_cache.pop(key, None)
|
|
return None
|
|
return items, total_pages
|
|
|
|
|
|
def _page_cache_set(working_url: str, page: int, items: List[Dict], total_pages: int) -> None:
|
|
key = f"{working_url}|p={page}"
|
|
_listing_page_cache[key] = ((items, total_pages), now())
|
|
|
|
|
|
async def _fetch_parse(url: str, page: int):
|
|
html = await fetch(_with_page(url, page))
|
|
soup = soup_of(html)
|
|
items = _parse_cards_from_soup(soup)
|
|
return items, soup
|
|
|
|
|
|
|
|
|
|
async def scrape_products(list_url: str, page: int = 1):
|
|
"""Fast listing fetch with variant memoization + page cache."""
|
|
_listing_base_key(list_url)
|
|
items, soup = await _fetch_parse(list_url, page)
|
|
|
|
total_pages = _derive_total_pages(soup)
|
|
return items, total_pages
|
|
|
|
def _derive_total_pages(soup) -> int:
|
|
total_pages = 1
|
|
textdump = normalize_text(soup.get_text(" "))
|
|
pages_from_text = parse_total_pages_from_text(textdump)
|
|
if pages_from_text:
|
|
total_pages = pages_from_text
|
|
else:
|
|
pages = {1}
|
|
for a in soup.find_all("a", href=True):
|
|
m = re.search(r"[?&]p=(\d+)", a["href"])
|
|
if m:
|
|
pages.add(int(m.group(1)))
|
|
total_pages = max(pages) if pages else 1
|
|
return total_pages
|
|
|
|
|
|
def _slugs_from_list_url(list_url: str) -> Tuple[str, Optional[str]]:
|
|
p = urlparse(list_url)
|
|
parts = [x for x in (p.path or "").split("/") if x]
|
|
top = parts[0].lower() if parts else ""
|
|
sub = None
|
|
if len(parts) >= 2:
|
|
sub = parts[1]
|
|
if sub.lower().endswith((".html", ".htm")):
|
|
sub = re.sub(r"\.(html?|HTML?)$", "", sub)
|
|
return top, sub
|