feat: initialize market app with browsing, product, and scraping code
Some checks failed
Build and Deploy / build-and-deploy (push) Has been cancelled

Split from coop monolith. Includes:
- Market/browse/product blueprints
- Product sync API
- Suma scraping pipeline
- Templates for market, browse, and product views
- Dockerfile and CI workflow for independent deployment
This commit is contained in:
giles
2026-02-09 23:16:34 +00:00
commit 6271a715a1
142 changed files with 8517 additions and 0 deletions

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,13 @@
# Auto-import all extractor modules so they register themselves.
from .title import ex_title # noqa: F401
from .images import ex_images # noqa: F401
from .short_description import ex_short_description # noqa: F401
from .description_sections import ex_description_sections # noqa: F401
from .nutrition_ex import ex_nutrition # noqa: F401
from .stickers import ex_stickers # noqa: F401
from .labels import ex_labels # noqa: F401
from .info_table import ex_info_table # noqa: F401
from .oe_list_price import ex_oe_list_price # noqa: F401
from .regular_price_fallback import ex_regular_price_fallback # noqa: F401
from .breadcrumbs import ex_breadcrumbs # noqa: F401

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
from typing import Dict, List, Union
from urllib.parse import urlparse
from bs4 import BeautifulSoup
from utils import normalize_text
from ..registry import extractor
@extractor
def ex_breadcrumbs(soup: BeautifulSoup, url: str) -> Dict:
"""
Parse breadcrumbs to identify top and sub categories.
"""
bc_ul = (soup.select_one(".breadcrumbs ul.items")
or soup.select_one("nav.breadcrumbs ul.items")
or soup.select_one("ul.items"))
if not bc_ul:
return {}
crumbs = []
for li in bc_ul.select("li.item"):
a = li.find("a")
if a:
title = normalize_text(a.get("title") or a.get_text())
href = a.get("href")
else:
title = normalize_text(li.get_text())
href = None
slug = None
if href:
try:
p = urlparse(href)
path = (p.path or "").strip("/")
slug = path.split("/")[-1] if path else None
except Exception:
slug = None
if slug:
crumbs.append({"title": title or None, "href": href or None, "slug": slug})
category_links = [c for c in crumbs if c.get("href")]
top = None
sub = None
for c in category_links:
t = (c.get("title") or "").lower()
s = (c.get("slug") or "").lower()
if t == "home" or s in ("", "home"):
continue
if top is None:
top = c
continue
if sub is None:
sub = c
break
out: Dict[str, Union[str, List[Dict[str, str]]]] = {
"category_breadcrumbs": crumbs
}
if top:
out["category_top_title"] = top.get("title")
out["category_top_href"] = top.get("href")
out["category_top_slug"] = top.get("slug")
if sub:
out["category_sub_title"] = sub.get("title")
out["category_sub_href"] = sub.get("href")
out["category_sub_slug"] = sub.get("slug")
if top and sub:
out["category_path"] = f"{(top.get('slug') or '').strip()}/{(sub.get('slug') or '').strip()}"
return out

View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from typing import Dict, List
from bs4 import BeautifulSoup
from utils import normalize_text
from ...html_utils import absolutize_fragment
from ..registry import extractor
from ..helpers.desc import (
split_description_container, find_description_container,
pair_title_content_from_magento_tabs, scan_headings_for_sections,
additional_attributes_table,
)
from ..helpers.text import clean_title, is_blacklisted_heading
@extractor
def ex_description_sections(soup: BeautifulSoup, url: str) -> Dict:
description_html = None
sections: List[Dict] = []
desc_el = find_description_container(soup)
if desc_el:
open_html, sections_from_desc = split_description_container(desc_el)
description_html = open_html or None
sections.extend(sections_from_desc)
existing = {s["title"].lower() for s in sections}
for t, html_fragment in (pair_title_content_from_magento_tabs(soup) or scan_headings_for_sections(soup)):
low = t.lower()
if "product description" in low or low == "description" or "details" in low:
if not description_html and html_fragment:
description_html = absolutize_fragment(html_fragment)
continue
if t.lower() not in existing and normalize_text(BeautifulSoup(html_fragment, "lxml").get_text()):
if not is_blacklisted_heading(t):
sections.append({"title": clean_title(t), "html": absolutize_fragment(html_fragment)})
existing.add(t.lower())
addl = additional_attributes_table(soup)
if addl and "additional information" not in existing and not is_blacklisted_heading("additional information"):
sections.append({"title": "Additional Information", "html": addl})
out = {"sections": sections}
if description_html:
out["description_html"] = description_html
return out

View File

@@ -0,0 +1,89 @@
from __future__ import annotations
import json, re
from typing import Dict, List
from bs4 import BeautifulSoup
from ..registry import extractor
from ..helpers.html import abs_url, collect_img_candidates, dedup_by_filename
@extractor
def ex_images(soup: BeautifulSoup, url: str) -> Dict:
images: List[str] = []
debug = False # set True while debugging
# 1) Magento init script (gallery)
scripts = soup.find_all("script", attrs={"type": "text/x-magento-init"})
if debug: print(f"[ex_images] x-magento-init scripts: {len(scripts)}")
for script in scripts:
# Use raw string as-is; no stripping/collapsing
text = script.string or script.get_text() or ""
if "mage/gallery/gallery" not in text:
continue
# Correct (not over-escaped) patterns:
m = re.search(r'"data"\s*:\s*(\[[\s\S]*?\])', text)
if not m:
if debug: print("[ex_images] 'data' array not found in gallery block")
continue
arr_txt = m.group(1)
added = False
try:
data = json.loads(arr_txt)
for entry in data:
u = abs_url(entry.get("full")) or abs_url(entry.get("img"))
if u:
images.append(u); added = True
except Exception as e:
if debug: print(f"[ex_images] json.loads failed: {e!r}; trying regex fallback")
# Fallback to simple key extraction
fulls = re.findall(r'"full"\s*:\s*"([^"]+)"', arr_txt)
imgs = re.findall(r'"img"\s*:\s*"([^"]+)"', arr_txt) if not fulls else []
for u in (fulls or imgs):
u = abs_url(u)
if u:
images.append(u); added = True
if added:
break # got what we need from the gallery block
# 2) JSON-LD fallback
if not images:
for script in soup.find_all("script", attrs={"type": "application/ld+json"}):
raw = script.string or script.get_text() or ""
try:
data = json.loads(raw)
except Exception:
continue
def add_from(val):
if isinstance(val, str):
u = abs_url(val); u and images.append(u)
elif isinstance(val, list):
for v in val:
if isinstance(v, str):
u = abs_url(v); u and images.append(u)
elif isinstance(v, dict) and "url" in v:
u = abs_url(v["url"]); u and images.append(u)
elif isinstance(val, dict) and "url" in val:
u = abs_url(val["url"]); u and images.append(u)
if isinstance(data, dict) and "image" in data:
add_from(data["image"])
if isinstance(data, list):
for item in data:
if isinstance(item, dict) and "image" in item:
add_from(item["image"])
# 3) Generic DOM scan fallback
if not images:
# consider broadening selectors if needed, e.g. '.fotorama__img'
for el in soup.select(".product.media img, .gallery-placeholder img, .fotorama__stage img"):
for cand in collect_img_candidates(el):
u = abs_url(cand)
if u:
images.append(u)
images = dedup_by_filename(images)
if debug: print(f"[ex_images] found images: {images}")
return {"images": images, "image": images[0] if images else None}

View File

@@ -0,0 +1,76 @@
from __future__ import annotations
from typing import Dict, Union
from bs4 import BeautifulSoup
from utils import normalize_text
from ..registry import extractor
from ..helpers.price import parse_price, parse_case_size
@extractor
def ex_info_table(soup: BeautifulSoup, url: str) -> Dict:
"""
Extracts:
<div class="product-page-info-table"> ... rows of label/content ... </div>
Produces:
info_table (raw map), brand, rrp[_raw|_currency], price_per_unit[_raw|_currency],
case_size_* fields
"""
container = soup.select_one(".product-page-info-table") or None
if not container:
return {}
rows_parent = container.select_one(".product-page-info-table-rows") or container
rows = rows_parent.select(".product-page-info-table-row") or []
if not rows:
return {}
raw_map: Dict[str, str] = {}
for r in rows:
lab_el = r.select_one(".product-page-info-table__label")
val_el = r.select_one(".product-page-info-table__content")
if not lab_el or not val_el:
continue
label = normalize_text(lab_el.get_text())
value = normalize_text(val_el.get_text())
if label:
raw_map[label] = value
out: Dict[str, Union[str, float, int, Dict]] = {"info_table": raw_map}
# Brand
brand = raw_map.get("Brand") or raw_map.get("Brand Name") or None
if brand:
out["brand"] = brand
# RRP
rrp_val, rrp_cur, rrp_raw = parse_price(raw_map.get("RRP", ""))
if rrp_raw and (rrp_val is not None or rrp_cur is not None):
out["rrp_raw"] = rrp_raw
if rrp_val is not None:
out["rrp"] = rrp_val
if rrp_cur:
out["rrp_currency"] = rrp_cur
# Price Per Unit
ppu_val, ppu_cur, ppu_raw = parse_price(
raw_map.get("Price Per Unit", "") or raw_map.get("Unit Price", "")
)
if ppu_raw and (ppu_val is not None or ppu_cur is not None):
out["price_per_unit_raw"] = ppu_raw
if ppu_val is not None:
out["price_per_unit"] = ppu_val
if ppu_cur:
out["price_per_unit_currency"] = ppu_cur
# Case Size
cs_text = raw_map.get("Case Size", "") or raw_map.get("Pack Size", "")
cs_count, cs_item_qty, cs_item_unit, cs_raw = parse_case_size(cs_text)
if cs_raw:
out["case_size_raw"] = cs_raw
if cs_count is not None:
out["case_size_count"] = cs_count
if cs_item_qty is not None:
out["case_size_item_qty"] = cs_item_qty
if cs_item_unit:
out["case_size_item_unit"] = cs_item_unit
return out

View File

@@ -0,0 +1,41 @@
from __future__ import annotations
from typing import Dict, List
from bs4 import BeautifulSoup
from utils import normalize_text
from ..registry import extractor
@extractor
def ex_labels(soup: BeautifulSoup, url: str) -> Dict:
"""
From:
<ul class="cdz-product-labels">
<li class="label-item new"><div class="label-content">NEW</div></li>
</ul>
Returns "labels": lower-cased union of class hints and visible text.
"""
root = soup.select_one("ul.cdz-product-labels")
if not root:
return {}
items: List[str] = []
texts: List[str] = []
for li in root.select("li.label-item"):
for c in (li.get("class") or []):
c = (c or "").strip()
if c and c.lower() != "label-item" and c not in items:
items.append(c)
txt = normalize_text(li.get_text())
if txt and txt not in texts:
texts.append(txt)
if not items and not texts:
return {}
union = []
seen = set()
for s in items + [t.lower() for t in texts]:
key = (s or "").strip().lower()
if key and key not in seen:
seen.add(key)
union.append(key)
return {"labels": union}

View File

@@ -0,0 +1,129 @@
from __future__ import annotations
from typing import Dict, List, Optional, Tuple
import re
from bs4 import BeautifulSoup
from utils import normalize_text
from ..registry import extractor
from ..helpers.desc import (
split_description_container, find_description_container,
pair_title_content_from_magento_tabs, scan_headings_for_sections,
)
# ----- value/unit parser ------------------------------------------------------
_NUM_UNIT_RE = re.compile(
r"""
^\s*
(?P<num>[-+]?\d{1,3}(?:[.,]\d{3})*(?:[.,]\d+)?|\d+(?:[.,]\d+)?)
\s*
(?P<unit>[a-zA-Z%µ/]+)?
\s*$
""",
re.X,
)
def _parse_value_unit(s: str) -> Tuple[Optional[str], Optional[str]]:
if not s:
return None, None
s = re.sub(r"\s+", " ", s.strip())
m = _NUM_UNIT_RE.match(s)
if not m:
return None, None
num = (m.group("num") or "").replace(",", "")
unit = m.group("unit") or None
if unit:
u = unit.lower()
if u in {"kcal", "kcal.", "kcalories", "kcalorie"}:
unit = "kcal"
elif u in {"kj", "kj.", "kilojoule", "kilojoules"}:
unit = "kJ"
return (num or None, unit)
# ----- section finder ---------------------------------------------------------
def _find_nutrition_section_html(soup: BeautifulSoup) -> Optional[str]:
"""
Return the HTML for the section whose title matches 'Nutritional Information'.
We look in the same places your description extractor does.
"""
# 1) Magento tabs
for t, html in (pair_title_content_from_magento_tabs(soup) or []):
if not t or not html:
continue
title = normalize_text(t).rstrip(":").lower()
if "nutritional information" in title:
return html
# 2) Description container split into sections
desc_el = find_description_container(soup)
if desc_el:
_open_html, sections = split_description_container(desc_el)
for sec in sections or []:
title = normalize_text((sec.get("title") or "")).rstrip(":").lower()
if "nutritional information" in title:
return sec.get("html") or ""
# 3) Fallback: generic heading scan
for t, html in (scan_headings_for_sections(soup) or []):
if not t or not html:
continue
title = normalize_text(t).rstrip(":").lower()
if "nutritional information" in title:
return html
return None
# ----- table parser -----------------------------------------------------------
def _extract_rows_from_table(root: BeautifulSoup) -> List[Dict[str, str]]:
out: List[Dict[str, str]] = []
table = root.select_one("table")
if not table:
return out
for tr in table.select("tr"):
th = tr.find("th")
tds = tr.find_all("td")
if th and tds:
key = normalize_text(th.get_text(" ").strip())
val_raw = normalize_text(tds[0].get_text(" ").strip())
elif len(tds) >= 2:
key = normalize_text(tds[0].get_text(" ").strip())
val_raw = normalize_text(tds[1].get_text(" ").strip())
else:
continue
if not key or not val_raw:
continue
value, unit = _parse_value_unit(val_raw)
if value is None: # keep raw if not parseable
value, unit = val_raw, None
out.append({"key": key, "value": value, "unit": unit})
# Deduplicate while preserving order
seen = set()
dedup: List[Dict[str, str]] = []
for r in out:
t = (r["key"], r.get("value"), r.get("unit"))
if t in seen:
continue
seen.add(t)
dedup.append(r)
return dedup
# ----- extractor --------------------------------------------------------------
@extractor
def ex_nutrition(soup: BeautifulSoup, url: str) -> Dict:
"""
Extract nutrition ONLY from the section titled 'Nutritional Information'.
Returns: {"nutrition": [{"key": "...", "value": "...", "unit": "..."}]}
"""
section_html = _find_nutrition_section_html(soup)
if not section_html:
return {"nutrition": []}
section_soup = BeautifulSoup(section_html, "lxml")
rows = _extract_rows_from_table(section_soup)
return {"nutrition": rows}

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
from typing import Dict, Union
from bs4 import BeautifulSoup
from ..registry import extractor
from ..helpers.price import parse_price
@extractor
def ex_oe_list_price(soup: BeautifulSoup, url: str) -> Dict:
"""
Extract Magento "oe-list-price" block:
<div class="oe-list-price">
<div class="rrp-price"><label>Regular Price: </label><span class="price">£30.50</span></div>
<div class="oe-final-price"><label>Special Price: </label><span>£23.63</span></div>
</div>
Produces:
oe_list_price: { rrp_raw, rrp, rrp_currency, special_raw, special, special_currency }
Also promotes special_* to top-level (special_price_*) if available.
"""
box = soup.select_one(".oe-list-price")
if not box:
return {}
out: Dict[str, Union[str, float, dict]] = {}
oe: Dict[str, Union[str, float]] = {}
# RRP inside oe-list-price (if present)
rrp = box.select_one(".rrp-price")
if rrp:
txt = (rrp.select_one("span.price") or rrp.select_one("span") or rrp).get_text(strip=True)
val, cur, raw = parse_price(txt)
if raw:
oe["rrp_raw"] = raw
if val is not None:
oe["rrp"] = val
if cur:
oe["rrp_currency"] = cur
# Special Price inside oe-list-price
sp = box.select_one(".oe-final-price, .special-price, .final-price")
if sp:
txt = (sp.select_one("span.price") or sp.select_one("span") or sp).get_text(strip=True)
val, cur, raw = parse_price(txt)
if raw:
oe["special_raw"] = raw
if val is not None:
oe["special"] = val
out["special_price"] = val
if cur:
oe["special_currency"] = cur
out["special_price_currency"] = cur
if raw:
out["special_price_raw"] = raw
if oe:
out["oe_list_price"] = oe
return out

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from typing import Dict, Union
from bs4 import BeautifulSoup
from ..registry import extractor
from ..helpers.price import parse_price
@extractor
def ex_regular_price_fallback(soup: BeautifulSoup, url: str) -> Dict:
"""
Fallback extractor for legacy 'Regular Price' blocks outside oe-list-price:
<div class="rrp-price"><label>Regular Price: </label><span class="price">£16.55</span></div>
"""
rrp = soup.select_one("div.rrp-price")
if not rrp:
return {}
span = rrp.select_one("span.price")
price_text = span.get_text(strip=True) if span else rrp.get_text(" ", strip=True)
value, currency, raw = parse_price(price_text or "")
out: Dict[str, Union[str, float]] = {}
if raw:
out["regular_price_raw"] = raw
if value is not None:
out["regular_price"] = value
if currency:
out["regular_price_currency"] = currency
if value is not None:
out.setdefault("rrp", value)
if currency:
out.setdefault("rrp_currency", currency)
if raw:
out.setdefault("rrp_raw", raw)
return out

View File

@@ -0,0 +1,19 @@
from __future__ import annotations
from typing import Dict
from bs4 import BeautifulSoup
from utils import normalize_text
from ..registry import extractor
@extractor
def ex_short_description(soup: BeautifulSoup, url: str) -> Dict:
desc_short = None
for sel in [".product.attribute.description .value", ".product.attribute.overview .value",
"meta[name='description']", "meta[property='og:description']"]:
el = soup.select_one(sel)
if not el:
continue
desc_short = normalize_text(el.get_text() if el.name != "meta" else el.get("content"))
if desc_short:
break
return {"description_short": desc_short}

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from typing import Dict, List
from bs4 import BeautifulSoup
from ..registry import extractor
@extractor
def ex_stickers(soup: BeautifulSoup, url: str) -> Dict:
"""
<div class="stickers">
<span class="sticker xxx"></span>
...
</div>
"""
root = soup.select_one("div.stickers")
if not root:
return {"stickers": []}
stickers: List[str] = []
seen = set()
for sp in root.select("span.sticker"):
classes = sp.get("class") or []
extras = [c.strip() for c in classes if c and c.lower() != "sticker"]
data_name = (sp.get("data-sticker") or "").strip()
if data_name:
extras.append(data_name)
for x in extras:
if x and x not in seen:
seen.add(x)
stickers.append(x)
return {"stickers": stickers}

View File

@@ -0,0 +1,17 @@
from __future__ import annotations
from typing import Dict
from bs4 import BeautifulSoup
from utils import normalize_text
from ..registry import extractor
@extractor
def ex_title(soup: BeautifulSoup, url: str) -> Dict:
title = None
for sel in ["h1.page-title span", "h1.page-title", "h1.product-name", "meta[property='og:title']"]:
el = soup.select_one(sel)
if el:
title = normalize_text(el.get_text()) if el.name != "meta" else el.get("content")
if title:
break
return {"title": title or "Product"}

View File

@@ -0,0 +1,165 @@
from __future__ import annotations
from typing import Dict, List, Optional, Tuple
from bs4 import BeautifulSoup, NavigableString, Tag
from utils import normalize_text
from ...html_utils import absolutize_fragment
from .text import clean_title, is_blacklisted_heading
from config import config
def split_description_container(desc_el: Tag) -> Tuple[str, List[Dict]]:
"""
Extract sections from accordion blocks within the description container.
Looks for headings with class 'accordion-title' and pairs each with its
next element-sibling having class 'accordion-details'. Returns:
- open_html: the remaining description HTML with those accordion blocks removed
- sections: [{"title": ..., "html": ...}, ...]
"""
# Work on an isolated copy to avoid mutating the original DOM
frag = BeautifulSoup(desc_el.decode_contents(), "lxml")
# Collect candidate (heading, details) pairs without mutating during iteration
pairs: List[Tuple[Tag, Tag]] = []
for h in frag.select("#accordion .accordion-title, .accordion .accordion-title, h5.accordion-title, .accordion-title"):
if not isinstance(h, Tag):
continue
title = clean_title((h.get_text() or "").strip())
if not title:
continue
# Walk forward siblings until we hit an element; accept the first with 'accordion-details'
sib = h.next_sibling
details: Optional[Tag] = None
while sib is not None:
if isinstance(sib, Tag):
classes = sib.get("class") or []
if "accordion-details" in classes:
details = sib
break
sib = sib.next_sibling
if details is not None:
pairs.append((h, details))
sections: List[Dict] = []
# Extract sections, then remove nodes from frag
for h, details in pairs:
# Pull details HTML
html = details.decode_contents()
# Only keep non-empty (textual) content
if normalize_text(BeautifulSoup(html, "lxml").get_text()):
sections.append({
"title": clean_title(h.get_text() or ""),
"html": absolutize_fragment(html),
})
# Remove the matched nodes from the fragment copy
details.decompose()
h.decompose()
# Whatever remains is the open description html
open_html = absolutize_fragment(str(frag)) if frag else ""
return open_html, sections
def pair_title_content_from_magento_tabs(soup: BeautifulSoup):
out = []
container = soup.select_one(".product.info.detailed .product.data.items") or soup.select_one(".product.data.items")
if not container:
return out
titles = container.select(".data.item.title")
for t in titles:
title = normalize_text(t.get_text())
if not title:
continue
content_id = t.get("aria-controls") or t.get("data-target")
content = soup.select_one(f"#{content_id}") if content_id else None
if content is None:
sib = t.find_next_sibling(
lambda x: isinstance(x, Tag) and "data" in x.get("class", []) and "item" in x.get("class", []) and "content" in x.get("class", [])
)
content = sib
if content:
html = content.decode_contents()
if not is_blacklisted_heading(title):
out.append((title, absolutize_fragment(html)))
return out
def scan_headings_for_sections(soup: BeautifulSoup):
out = []
container = (
soup.select_one(".product.info.detailed")
or soup.select_one(".product-info-main")
or soup.select_one(".page-main")
or soup
)
heads = container.select("h2, h3, h4, h5, h6")
section_titles = (config().get("section-titles") or [])
for h in heads:
title = clean_title(h.get_text() or "")
if not title:
continue
low = title.lower()
if not any(k in low for k in section_titles + ["product description", "description", "details"]):
continue
parts: List[str] = []
for sib in h.next_siblings:
if isinstance(sib, NavigableString):
parts.append(str(sib))
continue
if isinstance(sib, Tag) and sib.name in ("h2", "h3", "h4", "h5", "h6"):
break
if isinstance(sib, Tag):
parts.append(str(sib))
html = absolutize_fragment("".join(parts).strip())
if html and not is_blacklisted_heading(title):
out.append((title, html))
return out
def additional_attributes_table(soup: BeautifulSoup) -> Optional[str]:
table = soup.select_one(".additional-attributes, table.additional-attributes, .product.attribute.additional table")
if not table:
return None
try:
rows = []
for tr in table.select("tr"):
th = tr.find("th") or tr.find("td")
tds = tr.find_all("td")
key = normalize_text(th.get_text()) if th else None
val = normalize_text(tds[-1].get_text()) if tds else None
if key and val:
rows.append((key, val))
if not rows:
return None
items = "\n".join(
[
f"""<div class='grid grid-cols-3 gap-2 py-1 border-b'>
<div class='col-span-1 font-medium'>{key}</div>
<div class='col-span-2 text-stone-700'>{val}</div>
</div>"""
for key, val in rows
]
)
return f"<div class='rounded-lg border bg-white'>{items}</div>"
except Exception:
return None
def find_description_container(soup: BeautifulSoup) -> Optional[Tag]:
for sel in ["#description", "#tab-description", ".product.attribute.description .value",
".product.attribute.overview .value", ".product.info.detailed .value"]:
el = soup.select_one(sel)
if el and normalize_text(el.get_text()):
return el
for h in soup.select("h2, h3, h4, h5, h6"):
txt = normalize_text(h.get_text()).lower()
if txt.startswith("product description") or txt == "description":
wrapper = soup.new_tag("div")
for sib in h.next_siblings:
if isinstance(sib, Tag) and sib.name in ("h2", "h3", "h4", "h5", "h6"):
break
wrapper.append(sib if isinstance(sib, Tag) else NavigableString(str(sib)))
if normalize_text(wrapper.get_text()):
return wrapper
return None

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
from typing import List, Optional
from urllib.parse import urljoin, urlparse
from config import config
def first_from_srcset(val: str) -> Optional[str]:
if not val:
return None
first = val.split(",")[0].strip()
parts = first.split()
return parts[0] if parts else first
def abs_url(u: Optional[str]) -> Optional[str]:
if not u:
return None
return urljoin(config()["base_url"], u) if isinstance(u, str) and u.startswith("/") else u
def collect_img_candidates(el) -> List[str]:
urls: List[str] = []
if not el:
return urls
attrs = ["src", "data-src", "data-original", "data-zoom-image", "data-thumb", "content", "href"]
for a in attrs:
v = el.get(a)
if v:
urls.append(v)
for a in ["srcset", "data-srcset"]:
v = el.get(a)
if v:
first = first_from_srcset(v)
if first:
urls.append(first)
return urls
def _filename_key(u: str) -> str:
p = urlparse(u)
path = p.path or ""
if path.endswith("/"):
path = path[:-1]
last = path.split("/")[-1]
return f"{p.netloc}:{last}".lower()
def dedup_by_filename(urls: List[str]) -> List[str]:
seen = set()
out: List[str] = []
for u in urls:
k = _filename_key(u)
if k in seen:
continue
seen.add(k)
out.append(u)
return out

View File

@@ -0,0 +1,42 @@
from __future__ import annotations
import re
from typing import Optional, Tuple
def parse_price(text: str) -> Tuple[Optional[float], Optional[str], str]:
"""
Return (value, currency, raw) from a price-like string.
Supports symbols £, €, $; strips thousands commas.
"""
raw = (text or "").strip()
m = re.search(r'([£€$])?\s*([0-9][0-9.,]*)', raw)
if not m:
return None, None, raw
sym = m.group(1) or ""
num = m.group(2).replace(",", "")
try:
value = float(num)
except ValueError:
return None, None, raw
currency = {"£": "GBP", "": "EUR", "$": "USD"}.get(sym, None)
return value, currency, raw
def parse_case_size(text: str) -> Tuple[Optional[int], Optional[float], Optional[str], str]:
"""
Parse strings like "6 x 500g", "12x1L", "24 × 330 ml"
Returns (count, item_qty, item_unit, raw)
"""
raw = (text or "").strip()
if not raw:
return None, None, None, raw
t = re.sub(r"[×Xx]\s*", " x ", raw)
m = re.search(r"(\d+)\s*x\s*([0-9]*\.?[0-9]+)\s*([a-zA-Z]+)", t)
if not m:
return None, None, None, raw
count = int(m.group(1))
try:
item_qty = float(m.group(2))
except ValueError:
item_qty = None
unit = m.group(3)
return count, item_qty, unit, raw

View File

@@ -0,0 +1,16 @@
from __future__ import annotations
import re
from utils import normalize_text
from config import config
def clean_title(t: str) -> str:
t = normalize_text(t)
t = re.sub(r":\s*$", "", t)
return t
def is_blacklisted_heading(title: str) -> bool:
"""Return True if heading should be skipped based on config blacklist."""
bl = (config().get("blacklist") or {}).get("product-details") or []
low = (title or "").strip().lower()
return any(low == (s or "").strip().lower() for s in bl)

View File

@@ -0,0 +1,48 @@
from __future__ import annotations
from typing import Dict, Tuple, Union
from utils import soup_of
from ..http_client import fetch
from ..html_utils import absolutize_fragment
from suma_browser.app.bp.browse.services.slugs import product_slug_from_href
from .registry import REGISTRY, merge_missing
from . import extractors as _auto_register # noqa: F401 (import-time side effects)
async def scrape_product_detail(product_url: str, include_html: bool = False) -> Union[dict, Tuple[dict, str]]:
"""
Returns a dict with fields (subset):
title, images, image, description_short, description_html, sections,
slug, suma_href, stickers, labels, info_table fields, oe_list_price, prices,
breadcrumbs-derived category_* fields.
If include_html=True, returns (data, html).
"""
html = await fetch(product_url)
data: Dict[str, Union[str, float, int, list, dict, None]] = {
"suma_href": product_url,
"slug": product_slug_from_href(product_url),
}
# Run all extractors
for fn in REGISTRY:
try:
soup = soup_of(html)
piece = fn(soup, product_url) or {}
except Exception:
# Tolerate site drift
continue
merge_missing(data, piece)
# If we found short description but not description_html, echo it
if not data.get("description_html") and data.get("description_short"):
data["description_html"] = absolutize_fragment(f"<p>{data['description_short']}</p>")
# Ensure "image" mirrors first of images if not set
if not data.get("image"):
imgs = data.get("images") or []
if isinstance(imgs, list) and imgs:
data["image"] = imgs[0]
if include_html:
return data, html
return data

View File

@@ -0,0 +1,4 @@
from __future__ import annotations
# Thin wrapper to keep import path stable
from .product_core import scrape_product_detail # re-export

View File

@@ -0,0 +1,20 @@
from __future__ import annotations
from typing import Callable, Dict, List, Union
Extractor = Callable[[object, str], Dict[str, Union[str, float, int, list, dict, None]]]
REGISTRY: List[Extractor] = []
def extractor(fn: Extractor) -> Extractor:
"""Decorator to register an extractor."""
REGISTRY.append(fn)
return fn
def merge_missing(dst: dict, src: dict) -> None:
"""
Merge src into dst. Only write keys that are missing or empty in dst.
"Empty" means None, "", [], {}.
"""
for k, v in (src or {}).items():
if k not in dst or dst[k] in (None, "", [], {}):
dst[k] = v