# products_api_async.py from __future__ import annotations from datetime import datetime, timezone from decimal import Decimal from typing import Any, Dict, List, Tuple, Iterable, Optional from quart import Blueprint, request, jsonify, g from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from scrape.persist_snapshot.log_product_result import _log_product_result from scrape.persist_snapshot.save_nav import _save_nav from scrape.persist_snapshot.capture_listing import _capture_listing from scrape.persist_snapshot.save_subcategory_redirects import _save_subcategory_redirects # ⬇️ Import your models (names match your current file) from models.market import ( Product, ProductImage, ProductSection, ProductLabel, ProductSticker, ProductAttribute, ProductNutrition, ProductAllergen, ) from shared.browser.app.redis_cacher import clear_cache from shared.browser.app.csrf import csrf_exempt products_api = Blueprint("products_api", __name__, url_prefix="/api/products") # ---- Comparison config (matches your schema) -------------------------------- PRODUCT_FIELDS: List[str] = [ "slug", "title", "image", "description_short", "description_html", "suma_href", "brand", "rrp", "rrp_currency", "rrp_raw", "price_per_unit", "price_per_unit_currency", "price_per_unit_raw", "special_price", "special_price_currency", "special_price_raw", "regular_price", "regular_price_currency", "regular_price_raw", "oe_list_price", "case_size_count", "case_size_item_qty", "case_size_item_unit", "case_size_raw", "ean", "sku", "unit_size", "pack_size", ] # rel_name -> (Model, fields_to_compare, key_for_orderless_compare) CHILD_SPECS: Dict[str, Tuple[Any, List[str], str]] = { "images": (ProductImage, ["url", "position", "kind"], "url"), "sections": (ProductSection, ["title", "html"], "title"), "labels": (ProductLabel, ["name"], "name"), "stickers": (ProductSticker, ["name"], "name"), "attributes": (ProductAttribute, ["key", "value"], "key"), "nutrition": (ProductNutrition, ["key", "value", "unit"], "key"), "allergens": (ProductAllergen, ["name", "contains"], "name"), } def _now_utc(): return datetime.now(timezone.utc) def _norm_scalar(v: Any) -> Any: if isinstance(v, Decimal): s = format(v.normalize(), "f") return "0" if s in ("-0", "-0.0") else s if isinstance(v, bool): return bool(v) if isinstance(v, (int, float, str)) or v is None: return v return str(v) def _normalize_row(obj: Dict[str, Any], keep: List[str]) -> Dict[str, Any]: out: Dict[str, Any] = {} for f in keep: val = obj.get(f) if isinstance(val, str): val = val.strip() out[f] = _norm_scalar(val) return out def _list_to_index(items: Iterable[Dict[str, Any]], uniq: str) -> Dict[Any, Dict[str, Any]]: ix: Dict[Any, Dict[str, Any]] = {} for it in items or []: key = it.get(uniq) if key is None: continue ix[key] = it return ix def _serialize_product_for_compare(p: Product) -> Dict[str, Any]: root: Dict[str, Any] = {f: _norm_scalar(getattr(p, f)) for f in PRODUCT_FIELDS} for rel_name, (_Model, fields, uniq) in CHILD_SPECS.items(): rows: List[Dict[str, Any]] = [] for child in getattr(p, rel_name) or []: rows.append({f: _norm_scalar(getattr(child, f)) for f in fields}) root[rel_name] = _list_to_index(rows, uniq) return root def _serialize_payload_for_compare(payload: Dict[str, Any]) -> Dict[str, Any]: root = _normalize_row(payload, PRODUCT_FIELDS) for rel_name, (_Model, fields, uniq) in CHILD_SPECS.items(): rows = payload.get(rel_name) or [] rows = [r for r in rows if isinstance(r, dict)] root[rel_name] = _list_to_index([_normalize_row(r, fields) for r in rows], uniq) return root from decimal import Decimal, InvalidOperation def _is_numeric_like(x) -> bool: if isinstance(x, bool): return False if isinstance(x, (int, float, Decimal)): return True if isinstance(x, str): s = x.strip() if not s: return False try: Decimal(s) return True except InvalidOperation: return False return False def _to_decimal(x) -> Decimal: if isinstance(x, Decimal): return x if isinstance(x, bool) or x is None: raise InvalidOperation if isinstance(x, (int, str)): return Decimal(str(x).strip()) if isinstance(x, float): return Decimal(str(x)) # avoid float fp artifacts # last resort: string-coerce return Decimal(str(x).strip()) def values_different(av, bv) -> bool: # match original None semantics first if bv is None: return av is not None if av is None: return True if _is_numeric_like(bv): try: return _to_decimal(av) != _to_decimal(bv) except InvalidOperation: # av isn't numeric-parsable → different return True else: # non-numeric: compare as strings (like original) return f"{av}" != f"{bv}" import re _cf_a_re = re.compile(r']+/cdn-cgi/l/email-protection#[^"]+"[^>]*>(.*?)', re.I | re.S) _cf_span_re = re.compile(r']*class="__cf_email__"[^>]*>(.*?)', re.I | re.S) _cf_data_attr_re = re.compile(r'\sdata-cfemail="[^"]+"', re.I) _ws_re = re.compile(r'\s+') def normalize_cf_email(html: str) -> str: if not isinstance(html, str): return html s = html # Replace CF spans with their inner text s = _cf_span_re.sub(r'\1', s) # Replace CF protection anchors with their inner text s = _cf_a_re.sub(r'\1', s) # Drop the data-cfemail attribute if any remains s = _cf_data_attr_re.sub('', s) # Optional: collapse whitespace s = _ws_re.sub(' ', s).strip() return s def _deep_equal(a: Dict[str, Any], b: Dict[str, Any]) -> bool: # keys must match at this level if a.keys() != b.keys(): return False for k in a.keys(): av, bv = a[k], b[k] # Dicts: recurse, but don't return early unless it's False if isinstance(av, dict) and isinstance(bv, dict): if not _deep_equal(av, bv): # log_diff(k, av, bv) # optional return False continue # Lists/Tuples: compare length then elements (order-sensitive here) if isinstance(av, (list, tuple)) and isinstance(bv, (list, tuple)): if len(av) != len(bv): # log_diff(k, av, bv) return False for i, (ai, bi) in enumerate(zip(av, bv)): # nested dicts within lists if isinstance(ai, dict) and isinstance(bi, dict): if not _deep_equal(ai, bi): return False else: if values_different(normalize_cf_email(ai), normalize_cf_email(bi)): return False continue # Scalars / everything else if values_different(normalize_cf_email(av), normalize_cf_email(bv)): # print('!!deep', k, av, bv) return False return True # ---- Mutation helpers ------------------------------------------------------- def _apply_product_fields(p: Product, payload: Dict[str, Any]) -> None: for f in PRODUCT_FIELDS: setattr(p, f, payload.get(f)) p.updated_at = _now_utc() def _replace_children(p: Product, payload: Dict[str, Any]) -> None: # replace each relation wholesale (delete-orphan takes care of removal) #p.images.clear() for row in payload.get("images") or []: p.images.append(ProductImage( url=row.get("url"), position=row.get("position") or 0, kind=row.get("kind") or "gallery", created_at=_now_utc(), updated_at=_now_utc(), )) #p.sections.clear() for row in payload.get("sections") or []: p.sections.append(ProductSection( title=row.get("title") or "", html=row.get("html") or "", created_at=_now_utc(), updated_at=_now_utc(), )) #p.labels.clear() for row in payload.get("labels") or []: p.labels.append(ProductLabel( name=row.get("name") or "", created_at=_now_utc(), updated_at=_now_utc(), )) #p.stickers.clear() for row in payload.get("stickers") or []: p.stickers.append(ProductSticker( name=row.get("name") or "", created_at=_now_utc(), updated_at=_now_utc(), )) #p.attributes.clear() for row in payload.get("attributes") or []: p.attributes.append(ProductAttribute( key=row.get("key") or "", value=row.get("value"), created_at=_now_utc(), updated_at=_now_utc(), )) #p.nutrition.clear() for row in payload.get("nutrition") or []: p.nutrition.append(ProductNutrition( key=row.get("key") or "", value=row.get("value"), unit=row.get("unit"), created_at=_now_utc(), updated_at=_now_utc(), )) #p.allergens.clear() for row in payload.get("allergens") or []: p.allergens.append(ProductAllergen( name=row.get("name") or "", contains=bool(row.get("contains", False)), created_at=_now_utc(), updated_at=_now_utc(), )) async def _create_product_from_payload(session: AsyncSession, payload: Dict[str, Any]) -> Product: p = Product() _apply_product_fields(p, payload) p.created_at = _now_utc() p.deleted_at = None session.add(p) #await session.flush() # get p.id _replace_children(p, payload) await session.flush() # Publish to federation inline from shared.services.federation_publish import try_publish await try_publish( session, user_id=getattr(p, "user_id", None), activity_type="Create", object_type="Object", object_data={ "name": p.title or "", "summary": getattr(p, "description", "") or "", }, source_type="Product", source_id=p.id, ) return p # ---- API -------------------------------------------------------------------- @csrf_exempt @products_api.post("/listing/") @clear_cache(tag='browse') async def capture_lsting(): data: Dict[str, Any] = await request.get_json(force=True, silent=False) url = data['url'] items = data['items'] total_pages = data['total_pages'] await _capture_listing(g.s, url,items, total_pages) return {"ok": True} @csrf_exempt @products_api.post("/log/") @clear_cache(tag='browse') async def log_product(): data: Dict[str, Any] = await request.get_json(force=True, silent=False) ok = bool(data["ok"]) payload = data.get("payload") or {} try: await _log_product_result(g.s, ok, payload) return {"ok": True} except Exception as e: return {"ok": False} @csrf_exempt @products_api.post("/redirects/") @clear_cache(tag='browse') async def rediects(): data: Dict[str, str] = await request.get_json(force=True, silent=False) await _save_subcategory_redirects(g.s, data) return {"ok": True} @csrf_exempt @products_api.post("/nav/") @clear_cache(tag='browse') async def save_nav(): data: Dict[str, Any] = await request.get_json(force=True, silent=False) market = getattr(g, "market", None) market_id = market.id if market else None await _save_nav(g.s, data, market_id=market_id) return {"ok": True} @csrf_exempt @products_api.post("/sync/") @clear_cache(tag='browse') async def sync_product(): """ POST /api/products/sync Body includes top-level fields and child arrays like: { "slug": "my-product", "title": "...", "images": [{"url":"https://..","position":0,"kind":"gallery"}], "sections": [{"title":"Details","html":"
..
"}], "labels": [{"name":"Vegan"}], "stickers": [{"name":"Sale"}], "attributes": [{"key":"Country","value":"UK"}], "nutrition": [{"key":"Energy","value":"100","unit":"kcal"}], "allergens": [{"name":"Nuts","contains":true}] } """ payload = await request.get_json(force=True, silent=False) if not isinstance(payload, dict): return jsonify({"error": "Invalid JSON"}), 400 slug = payload.get("slug") if not isinstance(slug, str) or not slug: return jsonify({"error": "Missing 'slug'"}), 400 # find undeleted row by slug #stmt = select(Product).where(Product.slug == slug, Product.deleted_at.is_(None)) stmt = ( select(Product) .where(Product.slug == slug, Product.deleted_at.is_(None)) .options( selectinload(Product.images), selectinload(Product.sections), selectinload(Product.labels), selectinload(Product.stickers), selectinload(Product.attributes), selectinload(Product.nutrition), selectinload(Product.allergens), ) ) existing: Optional[Product] = (await g.s.execute(stmt)).scalars().first() incoming_norm = _serialize_payload_for_compare(payload) if existing: db_norm = _serialize_product_for_compare(existing) if _deep_equal(db_norm, incoming_norm): # Exactly equal → just touch updated_at existing.updated_at = _now_utc() await g.s.flush() return jsonify({"id": existing.id, "action": "touched"}), 200 # Different → soft delete old + create a new row existing.deleted_at = _now_utc() await g.s.flush() # ensure the soft-delete is persisted before inserting the new row new_p = await _create_product_from_payload(g.s, payload) await g.s.flush() return jsonify({"id": new_p.id, "action": "replaced"}), 201 # Not found → create new_p = await _create_product_from_payload(g.s, payload) await g.s.flush() return jsonify({"id": new_p.id, "action": "created"}), 201