This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
market/scrape/listings.py
giles 6271a715a1
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled
feat: initialize market app with browsing, product, and scraping code
Split from coop monolith. Includes:
- Market/browse/product blueprints
- Product sync API
- Suma scraping pipeline
- Templates for market, browse, and product views
- Dockerfile and CI workflow for independent deployment
2026-02-09 23:16:34 +00:00

290 lines
9.4 KiB
Python

from __future__ import annotations
import math
import re
from typing import Callable, Dict, List, Optional, Tuple
from urllib.parse import parse_qsl, urlencode, urljoin, urlparse, urlunparse
from .http_client import fetch
from suma_browser.app.bp.browse.services.slugs import product_slug_from_href
from suma_browser.app.bp.browse.services.state import (
KNOWN_PRODUCT_SLUGS,
_listing_page_cache,
_listing_page_ttl,
_listing_variant_cache,
_listing_variant_ttl,
now,
)
from utils import normalize_text, soup_of
from config import config
def parse_total_pages_from_text(text: str) -> Optional[int]:
m = re.search(r"Showing\s+(\d+)\s+of\s+(\d+)", text, re.I)
if not m:
return None
shown = int(m.group(1))
total = int(m.group(2))
per_page = 36 if shown in (12, 24, 36) else shown
return max(1, math.ceil(total / per_page))
def _first_from_srcset(val: str) -> Optional[str]:
if not val:
return None
first = val.split(",")[0].strip()
parts = first.split()
return parts[0] if parts else first
def _abs_url(u: Optional[str]) -> Optional[str]:
if not u:
return None
return urljoin(config()["base_url"], u) if isinstance(u, str) and u.startswith("/") else u
def _collect_img_candidates(el) -> List[str]:
urls: List[str] = []
if not el:
return urls
attrs = ["src", "data-src", "data-original", "data-zoom-image", "data-thumb", "content", "href"]
for a in attrs:
v = el.get(a)
if v:
urls.append(v)
for a in ["srcset", "data-srcset"]:
v = el.get(a)
if v:
first = _first_from_srcset(v)
if first:
urls.append(first)
return urls
def _dedupe_preserve_order_by(seq: List[str], key: Callable[[str], str]) -> List[str]:
seen = set()
out: List[str] = []
for s in seq:
if not s:
continue
k = key(s)
if k in seen:
continue
seen.add(k)
out.append(s)
return out
def _filename_key(u: str) -> str:
p = urlparse(u)
path = p.path or ""
if path.endswith("/"):
path = path[:-1]
last = path.split("/")[-1]
return f"{p.netloc}:{last}".lower()
def _parse_cards_from_soup(soup) -> List[Dict]:
"""Extract product tiles (name, href, image, desc) from listing soup.
De-duplicate by slug to avoid doubles from overlapping selectors."""
items: List[str] = []
seen_slugs: set[str] = set()
# Primary selectors (Magento 2 default)
card_wrappers = soup.select(
"li.product-item, .product-item, ol.products.list.items li, .products.list.items li, .product-item-info"
)
for card in card_wrappers:
a = (
card.select_one("a.product-item-link")
or card.select_one(".product-item-name a")
or card.select_one("a[href$='.html'], a[href$='.htm']")
)
if not a:
continue
#name = normalize_text(a.get_text()) or normalize_text(a.get("title") or "")
href = a.get("href")
#if not name or not href:
# continue
if href.startswith("/"):
href = urljoin(config()["base_url"], href)
slug = product_slug_from_href(href)
KNOWN_PRODUCT_SLUGS.add(slug)
if slug and slug not in seen_slugs:
seen_slugs.add(slug)
items.append(slug)
# Secondary: any product-looking anchors inside products container
if not items:
products_container = soup.select_one(".products") or soup
for a in products_container.select("a[href$='.html'], a[href$='.htm']"):
href = a.get("href")
if href.startswith("/"):
href = urljoin(config()["base_url"], href)
slug = product_slug_from_href(href)
KNOWN_PRODUCT_SLUGS.add(slug)
if slug not in seen_slugs:
seen_slugs.add(slug)
items.append(slug)
# Tertiary: JSON-LD fallback (ItemList/Product)
if not items:
import json
def add_product(name: Optional[str], url: Optional[str], image: Optional[str]):
if not url:
return
absu = urljoin(config()["base_url"], url) if url.startswith("/") else url
slug = product_slug_from_href(absu)
if not slug:
return
KNOWN_PRODUCT_SLUGS.add(slug)
if slug not in seen_slugs:
seen_slugs.add(slug)
items.append(slug)
for script in soup.find_all("script", attrs={"type": "application/ld+json"}):
#try:
data = json.loads(script.get_text())
#except Exception:
# continue
if isinstance(data, dict):
if data.get("@type") == "ItemList" and isinstance(data.get("itemListElement"), list):
for it in data["itemListElement"]:
if isinstance(it, dict):
ent = it.get("item") or it
if isinstance(ent, dict):
add_product(
ent.get("name"),
ent.get("url"),
(ent.get("image") if isinstance(ent.get("image"), str) else None),
)
if data.get("@type") == "Product":
add_product(
data.get("name"),
data.get("url"),
(data.get("image") if isinstance(data.get("image"), str) else None),
)
elif isinstance(data, list):
for ent in data:
if not isinstance(ent, dict):
continue
if ent.get("@type") == "Product":
add_product(
ent.get("name"),
ent.get("url"),
(ent.get("image") if isinstance(ent.get("image"), str) else None),
)
if ent.get("@type") == "ItemList":
for it in ent.get("itemListElement", []):
if isinstance(it, dict):
obj = it.get("item") or it
if isinstance(obj, dict):
add_product(
obj.get("name"),
obj.get("url"),
(obj.get("image") if isinstance(obj.get("image"), str) else None),
)
return items
def _with_query(url: str, add: Dict[str, str]) -> str:
p = urlparse(url)
q = dict(parse_qsl(p.query, keep_blank_values=True))
q.update(add)
new_q = urlencode(q)
return urlunparse((p.scheme, p.netloc, p.path, p.params, new_q, p.fragment))
def _with_page(url: str, page: int) -> str:
if page and page > 1:
return _with_query(url, {"p": str(page)})
return url
def _listing_base_key(url: str) -> str:
p = urlparse(url)
path = p.path.rstrip("/")
return f"{p.scheme}://{p.netloc}{path}".lower()
def _variant_cache_get(base_key: str) -> Optional[str]:
info = _listing_variant_cache.get(base_key)
if not info:
return None
url, ts = info
if (now() - ts) > _listing_variant_ttl:
_listing_variant_cache.pop(base_key, None)
return None
return url
def _variant_cache_set(base_key: str, working_url: str) -> None:
_listing_variant_cache[base_key] = (working_url, now())
def _page_cache_get(working_url: str, page: int) -> Optional[Tuple[List[Dict], int]]:
key = f"{working_url}|p={page}"
info = _listing_page_cache.get(key)
if not info:
return None
(items, total_pages), ts = info
if (now() - ts) > _listing_page_ttl:
_listing_page_cache.pop(key, None)
return None
return items, total_pages
def _page_cache_set(working_url: str, page: int, items: List[Dict], total_pages: int) -> None:
key = f"{working_url}|p={page}"
_listing_page_cache[key] = ((items, total_pages), now())
async def _fetch_parse(url: str, page: int):
html = await fetch(_with_page(url, page))
soup = soup_of(html)
items = _parse_cards_from_soup(soup)
return items, soup
async def scrape_products(list_url: str, page: int = 1):
"""Fast listing fetch with variant memoization + page cache."""
_listing_base_key(list_url)
items, soup = await _fetch_parse(list_url, page)
total_pages = _derive_total_pages(soup)
return items, total_pages
def _derive_total_pages(soup) -> int:
total_pages = 1
textdump = normalize_text(soup.get_text(" "))
pages_from_text = parse_total_pages_from_text(textdump)
if pages_from_text:
total_pages = pages_from_text
else:
pages = {1}
for a in soup.find_all("a", href=True):
m = re.search(r"[?&]p=(\d+)", a["href"])
if m:
pages.add(int(m.group(1)))
total_pages = max(pages) if pages else 1
return total_pages
def _slugs_from_list_url(list_url: str) -> Tuple[str, Optional[str]]:
p = urlparse(list_url)
parts = [x for x in (p.path or "").split("/") if x]
top = parts[0].lower() if parts else ""
sub = None
if len(parts) >= 2:
sub = parts[1]
if sub.lower().endswith((".html", ".htm")):
sub = re.sub(r"\.(html?|HTML?)$", "", sub)
return top, sub