# at top of persist_snapshot.py: from sqlalchemy.ext.asyncio import AsyncSession from typing import Dict from datetime import datetime from sqlalchemy import ( func, select, update ) from models.market import ( Product, ProductImage, ProductSection, ProductLabel, ProductSticker, ProductAttribute, ProductNutrition, ProductAllergen ) from shared.db.session import get_session from ._get import _get from .log_product_result import _log_product_result # --- Models are unchanged, see original code --- # ---------------------- Helper fns called from scraper ------------------------ async def _upsert_product(session: AsyncSession, d: Dict) -> Product: slug = d.get("slug") if not slug: raise ValueError("product missing slug") res = await session.execute(select(Product).where(Product.slug == slug, Product.deleted_at.is_(None))) p = res.scalar_one_or_none() if not p: p = Product(slug=slug) session.add(p) p.title = _get(d, "title") p.image = _get(d, "image") p.description_short = _get(d, "description_short") p.description_html = _get(d, "description_html") p.suma_href = _get(d, "suma_href") p.brand = _get(d, "brand") p.rrp = _get(d, "rrp") p.rrp_currency = _get(d, "rrp_currency") p.rrp_raw = _get(d, "rrp_raw") p.price_per_unit = _get(d, "price_per_unit") p.price_per_unit_currency = _get(d, "price_per_unit_currency") p.price_per_unit_raw = _get(d, "price_per_unit_raw") p.special_price = _get(d, "special_price") p.special_price_currency = _get(d, "special_price_currency") p.special_price_raw = _get(d, "special_price_raw") p.regular_price = _get(d, "regular_price") p.regular_price_currency = _get(d, "regular_price_currency") p.regular_price_raw = _get(d, "regular_price_raw") p.case_size_count = _get(d, "case_size_count") p.case_size_item_qty = _get(d, "case_size_item_qty") p.case_size_item_unit = _get(d, "case_size_item_unit") p.case_size_raw = _get(d, "case_size_raw") p.ean = d.get("ean") or d.get("barcode") or None p.sku = d.get("sku") p.unit_size = d.get("unit_size") p.pack_size = d.get("pack_size") p.updated_at = func.now() now = datetime.utcnow() # ProductSection sync existing_sections = await session.execute(select(ProductSection).where(ProductSection.product_id == p.id, ProductSection.deleted_at.is_(None))) existing_sections_set = {(s.title, s.html) for s in existing_sections.scalars()} new_sections_set = set() for sec in d.get("sections") or []: if isinstance(sec, dict) and sec.get("title") and sec.get("html"): new_sections_set.add((sec["title"], sec["html"])) if (sec["title"], sec["html"]) not in existing_sections_set: session.add(ProductSection(product_id=p.id, title=sec["title"], html=sec["html"])) for s in existing_sections_set - new_sections_set: await session.execute(update(ProductSection).where(ProductSection.product_id == p.id, ProductSection.title == s[0], ProductSection.html == s[1], ProductSection.deleted_at.is_(None)).values(deleted_at=now)) # ProductImage sync existing_images = await session.execute(select(ProductImage).where(ProductImage.product_id == p.id, ProductImage.deleted_at.is_(None))) existing_images_set = {(img.url, img.kind) for img in existing_images.scalars()} new_images_set = set() for kind, urls in [ ("gallery", d.get("images") or []), ("embedded", d.get("embedded_image_urls") or []), ("all", d.get("all_image_urls") or []), ]: for idx, url in enumerate(urls): if url: new_images_set.add((url, kind)) if (url, kind) not in existing_images_set: session.add(ProductImage(product_id=p.id, url=url, position=idx, kind=kind)) for img in existing_images_set - new_images_set: await session.execute(update(ProductImage).where(ProductImage.product_id == p.id, ProductImage.url == img[0], ProductImage.kind == img[1], ProductImage.deleted_at.is_(None)).values(deleted_at=now)) # ProductLabel sync existing_labels = await session.execute(select(ProductLabel).where(ProductLabel.product_id == p.id, ProductLabel.deleted_at.is_(None))) existing_labels_set = {label.name.strip() for label in existing_labels.scalars()} new_labels = {str(name).strip() for name in (d.get("labels") or []) if name} for name in new_labels - existing_labels_set: session.add(ProductLabel(product_id=p.id, name=name)) for name in existing_labels_set - new_labels: await session.execute(update(ProductLabel).where(ProductLabel.product_id == p.id, ProductLabel.name == name, ProductLabel.deleted_at.is_(None)).values(deleted_at=now)) # ProductSticker sync existing_stickers = await session.execute(select(ProductSticker).where(ProductSticker.product_id == p.id, ProductSticker.deleted_at.is_(None))) existing_stickers_set = {sticker.name.strip() for sticker in existing_stickers.scalars()} new_stickers = {str(name).strip().lower() for name in (d.get("stickers") or []) if name} for name in new_stickers - existing_stickers_set: session.add(ProductSticker(product_id=p.id, name=name)) for name in existing_stickers_set - new_stickers: await session.execute(update(ProductSticker).where(ProductSticker.product_id == p.id, ProductSticker.name == name, ProductSticker.deleted_at.is_(None)).values(deleted_at=now)) # ProductAttribute sync existing_attrs = await session.execute(select(ProductAttribute).where(ProductAttribute.product_id == p.id, ProductAttribute.deleted_at.is_(None))) existing_attrs_set = {(a.key, a.value) for a in existing_attrs.scalars()} new_attrs_set = set() for src, prefix in [(d.get("info_table") or {}, "info_table"), (d.get("oe_list_price") or {}, "oe_list_price")]: for k, v in src.items(): key = f"{prefix}/{str(k).strip()}" val = None if v is None else str(v) new_attrs_set.add((key, val)) if (key, val) not in existing_attrs_set: session.add(ProductAttribute(product_id=p.id, key=key, value=val)) for key, val in existing_attrs_set - new_attrs_set: await session.execute(update(ProductAttribute).where(ProductAttribute.product_id == p.id, ProductAttribute.key == key, ProductAttribute.value == val, ProductAttribute.deleted_at.is_(None)).values(deleted_at=now)) # ProductNutrition sync existing_nuts = await session.execute(select(ProductNutrition).where(ProductNutrition.product_id == p.id, ProductNutrition.deleted_at.is_(None))) existing_nuts_set = {(n.key, n.value, n.unit) for n in existing_nuts.scalars()} new_nuts_set = set() nutrition = d.get("nutrition") or [] if isinstance(nutrition, dict): for k, v in nutrition.items(): key, val = str(k).strip(), str(v) if v is not None else None new_nuts_set.add((key, val, None)) if (key, val, None) not in existing_nuts_set: session.add(ProductNutrition(product_id=p.id, key=key, value=val, unit=None)) elif isinstance(nutrition, list): for row in nutrition: try: key = str(row.get("key") or "").strip() val = None if row.get("value") is None else str(row.get("value")) unit = None if row.get("unit") is None else str(row.get("unit")) if key: new_nuts_set.add((key, val, unit)) if (key, val, unit) not in existing_nuts_set: session.add(ProductNutrition(product_id=p.id, key=key, value=val, unit=unit)) except Exception: continue for key, val, unit in existing_nuts_set - new_nuts_set: await session.execute(update(ProductNutrition).where(ProductNutrition.product_id == p.id, ProductNutrition.key == key, ProductNutrition.value == val, ProductNutrition.unit == unit, ProductNutrition.deleted_at.is_(None)).values(deleted_at=now)) # ProductAllergen sync existing_allergens = await session.execute(select(ProductAllergen).where(ProductAllergen.product_id == p.id, ProductAllergen.deleted_at.is_(None))) existing_allergens_set = {(a.name, a.contains) for a in existing_allergens.scalars()} new_allergens_set = set() for a in d.get("allergens") or []: if isinstance(a, str): nm, contains = a.strip(), True elif isinstance(a, dict): nm, contains = (a.get("name") or "").strip(), bool(a.get("contains", True)) else: continue if nm: new_allergens_set.add((nm, contains)) if (nm, contains) not in existing_allergens_set: session.add(ProductAllergen(product_id=p.id, name=nm, contains=contains)) for name, contains in existing_allergens_set - new_allergens_set: await session.execute(update(ProductAllergen).where(ProductAllergen.product_id == p.id, ProductAllergen.name == name, ProductAllergen.contains == contains, ProductAllergen.deleted_at.is_(None)).values(deleted_at=now)) await session.flush() return p async def upsert_product( slug, href, d, ): async with get_session() as session: try: await _upsert_product(session, d) await _log_product_result(session, ok=True, payload={ "slug": slug, "href_tried": href, "title": d.get("title"), "has_description_html": bool(d.get("description_html")), "has_description_short": bool(d.get("description_short")), "sections_count": len(d.get("sections") or []), "images_count": len(d.get("images")), "embedded_images_count": len(d.get("embedded_image_urls")), "all_images_count": len(d.get("all_image_urls")), }) except Exception as e: print(f"[ERROR] Failed to upsert product '{d.get('slug')}'") print(f" Title: {d}.get('title')") print(f" URL: {d.get('suma_href')}") print(f" Error type: {type(e).__name__}") print(f" Error message: {str(e)}") import traceback traceback.print_exc() await _log_product_result(session, ok=False, payload={ "slug": d.get("slug"), "href_tried": d.get("suma_href"), "error_type": type(e).__name__, "error_message": str(e), "title": d.get("title"), }) raise await session.commit()