This repository has been archived on 2026-02-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
market/bp/api/routes.py
giles a80547c7fa
All checks were successful
Build and Deploy / build-and-deploy (push) Successful in 52s
Update shared submodule + emit product events for federation
- Emit product.listed events when products are created
- Updated shared with federation handlers, delivery, anchoring

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-21 16:00:04 +00:00

431 lines
14 KiB
Python

# products_api_async.py
from __future__ import annotations
from datetime import datetime, timezone
from decimal import Decimal
from typing import Any, Dict, List, Tuple, Iterable, Optional
from quart import Blueprint, request, jsonify, g
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from scrape.persist_snapshot.log_product_result import _log_product_result
from scrape.persist_snapshot.save_nav import _save_nav
from scrape.persist_snapshot.capture_listing import _capture_listing
from scrape.persist_snapshot.save_subcategory_redirects import _save_subcategory_redirects
# ⬇️ Import your models (names match your current file)
from models.market import (
Product,
ProductImage,
ProductSection,
ProductLabel,
ProductSticker,
ProductAttribute,
ProductNutrition,
ProductAllergen,
)
from shared.browser.app.redis_cacher import clear_cache
from shared.browser.app.csrf import csrf_exempt
products_api = Blueprint("products_api", __name__, url_prefix="/api/products")
# ---- Comparison config (matches your schema) --------------------------------
PRODUCT_FIELDS: List[str] = [
"slug",
"title",
"image",
"description_short",
"description_html",
"suma_href",
"brand",
"rrp", "rrp_currency", "rrp_raw",
"price_per_unit", "price_per_unit_currency", "price_per_unit_raw",
"special_price", "special_price_currency", "special_price_raw",
"regular_price", "regular_price_currency", "regular_price_raw",
"oe_list_price",
"case_size_count", "case_size_item_qty", "case_size_item_unit", "case_size_raw",
"ean", "sku", "unit_size", "pack_size",
]
# rel_name -> (Model, fields_to_compare, key_for_orderless_compare)
CHILD_SPECS: Dict[str, Tuple[Any, List[str], str]] = {
"images": (ProductImage, ["url", "position", "kind"], "url"),
"sections": (ProductSection, ["title", "html"], "title"),
"labels": (ProductLabel, ["name"], "name"),
"stickers": (ProductSticker, ["name"], "name"),
"attributes": (ProductAttribute, ["key", "value"], "key"),
"nutrition": (ProductNutrition, ["key", "value", "unit"], "key"),
"allergens": (ProductAllergen, ["name", "contains"], "name"),
}
def _now_utc():
return datetime.now(timezone.utc)
def _norm_scalar(v: Any) -> Any:
if isinstance(v, Decimal):
s = format(v.normalize(), "f")
return "0" if s in ("-0", "-0.0") else s
if isinstance(v, bool):
return bool(v)
if isinstance(v, (int, float, str)) or v is None:
return v
return str(v)
def _normalize_row(obj: Dict[str, Any], keep: List[str]) -> Dict[str, Any]:
out: Dict[str, Any] = {}
for f in keep:
val = obj.get(f)
if isinstance(val, str):
val = val.strip()
out[f] = _norm_scalar(val)
return out
def _list_to_index(items: Iterable[Dict[str, Any]], uniq: str) -> Dict[Any, Dict[str, Any]]:
ix: Dict[Any, Dict[str, Any]] = {}
for it in items or []:
key = it.get(uniq)
if key is None:
continue
ix[key] = it
return ix
def _serialize_product_for_compare(p: Product) -> Dict[str, Any]:
root: Dict[str, Any] = {f: _norm_scalar(getattr(p, f)) for f in PRODUCT_FIELDS}
for rel_name, (_Model, fields, uniq) in CHILD_SPECS.items():
rows: List[Dict[str, Any]] = []
for child in getattr(p, rel_name) or []:
rows.append({f: _norm_scalar(getattr(child, f)) for f in fields})
root[rel_name] = _list_to_index(rows, uniq)
return root
def _serialize_payload_for_compare(payload: Dict[str, Any]) -> Dict[str, Any]:
root = _normalize_row(payload, PRODUCT_FIELDS)
for rel_name, (_Model, fields, uniq) in CHILD_SPECS.items():
rows = payload.get(rel_name) or []
rows = [r for r in rows if isinstance(r, dict)]
root[rel_name] = _list_to_index([_normalize_row(r, fields) for r in rows], uniq)
return root
from decimal import Decimal, InvalidOperation
def _is_numeric_like(x) -> bool:
if isinstance(x, bool):
return False
if isinstance(x, (int, float, Decimal)):
return True
if isinstance(x, str):
s = x.strip()
if not s:
return False
try:
Decimal(s)
return True
except InvalidOperation:
return False
return False
def _to_decimal(x) -> Decimal:
if isinstance(x, Decimal):
return x
if isinstance(x, bool) or x is None:
raise InvalidOperation
if isinstance(x, (int, str)):
return Decimal(str(x).strip())
if isinstance(x, float):
return Decimal(str(x)) # avoid float fp artifacts
# last resort: string-coerce
return Decimal(str(x).strip())
def values_different(av, bv) -> bool:
# match original None semantics first
if bv is None:
return av is not None
if av is None:
return True
if _is_numeric_like(bv):
try:
return _to_decimal(av) != _to_decimal(bv)
except InvalidOperation:
# av isn't numeric-parsable → different
return True
else:
# non-numeric: compare as strings (like original)
return f"{av}" != f"{bv}"
import re
_cf_a_re = re.compile(r'<a[^>]+/cdn-cgi/l/email-protection#[^"]+"[^>]*>(.*?)</a>', re.I | re.S)
_cf_span_re = re.compile(r'<span[^>]*class="__cf_email__"[^>]*>(.*?)</span>', re.I | re.S)
_cf_data_attr_re = re.compile(r'\sdata-cfemail="[^"]+"', re.I)
_ws_re = re.compile(r'\s+')
def normalize_cf_email(html: str) -> str:
if not isinstance(html, str):
return html
s = html
# Replace CF spans with their inner text
s = _cf_span_re.sub(r'\1', s)
# Replace CF protection anchors with their inner text
s = _cf_a_re.sub(r'\1', s)
# Drop the data-cfemail attribute if any remains
s = _cf_data_attr_re.sub('', s)
# Optional: collapse whitespace
s = _ws_re.sub(' ', s).strip()
return s
def _deep_equal(a: Dict[str, Any], b: Dict[str, Any]) -> bool:
# keys must match at this level
if a.keys() != b.keys():
return False
for k in a.keys():
av, bv = a[k], b[k]
# Dicts: recurse, but don't return early unless it's False
if isinstance(av, dict) and isinstance(bv, dict):
if not _deep_equal(av, bv):
# log_diff(k, av, bv) # optional
return False
continue
# Lists/Tuples: compare length then elements (order-sensitive here)
if isinstance(av, (list, tuple)) and isinstance(bv, (list, tuple)):
if len(av) != len(bv):
# log_diff(k, av, bv)
return False
for i, (ai, bi) in enumerate(zip(av, bv)):
# nested dicts within lists
if isinstance(ai, dict) and isinstance(bi, dict):
if not _deep_equal(ai, bi):
return False
else:
if values_different(normalize_cf_email(ai), normalize_cf_email(bi)):
return False
continue
# Scalars / everything else
if values_different(normalize_cf_email(av), normalize_cf_email(bv)):
# print('!!deep', k, av, bv)
return False
return True
# ---- Mutation helpers -------------------------------------------------------
def _apply_product_fields(p: Product, payload: Dict[str, Any]) -> None:
for f in PRODUCT_FIELDS:
setattr(p, f, payload.get(f))
p.updated_at = _now_utc()
def _replace_children(p: Product, payload: Dict[str, Any]) -> None:
# replace each relation wholesale (delete-orphan takes care of removal)
#p.images.clear()
for row in payload.get("images") or []:
p.images.append(ProductImage(
url=row.get("url"),
position=row.get("position") or 0,
kind=row.get("kind") or "gallery",
created_at=_now_utc(), updated_at=_now_utc(),
))
#p.sections.clear()
for row in payload.get("sections") or []:
p.sections.append(ProductSection(
title=row.get("title") or "",
html=row.get("html") or "",
created_at=_now_utc(), updated_at=_now_utc(),
))
#p.labels.clear()
for row in payload.get("labels") or []:
p.labels.append(ProductLabel(
name=row.get("name") or "",
created_at=_now_utc(), updated_at=_now_utc(),
))
#p.stickers.clear()
for row in payload.get("stickers") or []:
p.stickers.append(ProductSticker(
name=row.get("name") or "",
created_at=_now_utc(), updated_at=_now_utc(),
))
#p.attributes.clear()
for row in payload.get("attributes") or []:
p.attributes.append(ProductAttribute(
key=row.get("key") or "",
value=row.get("value"),
created_at=_now_utc(), updated_at=_now_utc(),
))
#p.nutrition.clear()
for row in payload.get("nutrition") or []:
p.nutrition.append(ProductNutrition(
key=row.get("key") or "",
value=row.get("value"),
unit=row.get("unit"),
created_at=_now_utc(), updated_at=_now_utc(),
))
#p.allergens.clear()
for row in payload.get("allergens") or []:
p.allergens.append(ProductAllergen(
name=row.get("name") or "",
contains=bool(row.get("contains", False)),
created_at=_now_utc(), updated_at=_now_utc(),
))
async def _create_product_from_payload(session: AsyncSession, payload: Dict[str, Any]) -> Product:
p = Product()
_apply_product_fields(p, payload)
p.created_at = _now_utc()
p.deleted_at = None
session.add(p)
#await session.flush() # get p.id
_replace_children(p, payload)
await session.flush()
# Emit federation event for new products
from shared.events import emit_event
await emit_event(
session,
event_type="product.listed",
aggregate_type="Product",
aggregate_id=p.id,
payload={
"title": p.title or "",
"description": getattr(p, "description", "") or "",
},
)
return p
# ---- API --------------------------------------------------------------------
@csrf_exempt
@products_api.post("/listing/")
@clear_cache(tag='browse')
async def capture_lsting():
data: Dict[str, Any] = await request.get_json(force=True, silent=False)
url = data['url']
items = data['items']
total_pages = data['total_pages']
await _capture_listing(g.s, url,items, total_pages)
return {"ok": True}
@csrf_exempt
@products_api.post("/log/")
@clear_cache(tag='browse')
async def log_product():
data: Dict[str, Any] = await request.get_json(force=True, silent=False)
ok = bool(data["ok"])
payload = data.get("payload") or {}
try:
await _log_product_result(g.s, ok, payload)
return {"ok": True}
except Exception as e:
return {"ok": False}
@csrf_exempt
@products_api.post("/redirects/")
@clear_cache(tag='browse')
async def rediects():
data: Dict[str, str] = await request.get_json(force=True, silent=False)
await _save_subcategory_redirects(g.s, data)
return {"ok": True}
@csrf_exempt
@products_api.post("/nav/")
@clear_cache(tag='browse')
async def save_nav():
data: Dict[str, Any] = await request.get_json(force=True, silent=False)
market = getattr(g, "market", None)
market_id = market.id if market else None
await _save_nav(g.s, data, market_id=market_id)
return {"ok": True}
@csrf_exempt
@products_api.post("/sync/")
@clear_cache(tag='browse')
async def sync_product():
"""
POST /api/products/sync
Body includes top-level fields and child arrays like:
{
"slug": "my-product",
"title": "...",
"images": [{"url":"https://..","position":0,"kind":"gallery"}],
"sections": [{"title":"Details","html":"<p>..</p>"}],
"labels": [{"name":"Vegan"}],
"stickers": [{"name":"Sale"}],
"attributes": [{"key":"Country","value":"UK"}],
"nutrition": [{"key":"Energy","value":"100","unit":"kcal"}],
"allergens": [{"name":"Nuts","contains":true}]
}
"""
payload = await request.get_json(force=True, silent=False)
if not isinstance(payload, dict):
return jsonify({"error": "Invalid JSON"}), 400
slug = payload.get("slug")
if not isinstance(slug, str) or not slug:
return jsonify({"error": "Missing 'slug'"}), 400
# find undeleted row by slug
#stmt = select(Product).where(Product.slug == slug, Product.deleted_at.is_(None))
stmt = (
select(Product)
.where(Product.slug == slug, Product.deleted_at.is_(None))
.options(
selectinload(Product.images),
selectinload(Product.sections),
selectinload(Product.labels),
selectinload(Product.stickers),
selectinload(Product.attributes),
selectinload(Product.nutrition),
selectinload(Product.allergens),
)
)
existing: Optional[Product] = (await g.s.execute(stmt)).scalars().first()
incoming_norm = _serialize_payload_for_compare(payload)
if existing:
db_norm = _serialize_product_for_compare(existing)
if _deep_equal(db_norm, incoming_norm):
# Exactly equal → just touch updated_at
existing.updated_at = _now_utc()
await g.s.flush()
return jsonify({"id": existing.id, "action": "touched"}), 200
# Different → soft delete old + create a new row
existing.deleted_at = _now_utc()
await g.s.flush() # ensure the soft-delete is persisted before inserting the new row
new_p = await _create_product_from_payload(g.s, payload)
await g.s.flush()
return jsonify({"id": new_p.id, "action": "replaced"}), 201
# Not found → create
new_p = await _create_product_from_payload(g.s, payload)
await g.s.flush()
return jsonify({"id": new_p.id, "action": "created"}), 201