# at top of persist_snapshot.py: from typing import Optional, List from sqlalchemy.ext.asyncio import AsyncSession from typing import List, Optional, Tuple from sqlalchemy.dialects.postgresql import insert as pg_insert from datetime import datetime from sqlalchemy import ( select, update ) from urllib.parse import urlparse import re from models.market import ( NavTop, NavSub, Listing, ListingItem, ) from shared.db.session import get_session # --- Models are unchanged, see original code --- # ---------------------- Helper fns called from scraper ------------------------ async def capture_listing( #product_slugs: Set[str], url: str, items: List[str], total_pages: int ) -> None: async with get_session() as session: await _capture_listing( session, url, items, total_pages ) await session.commit() async def _capture_listing( session, url: str, items: List[str], total_pages: int ) -> None: top_id, sub_id = await _nav_ids_from_list_url(session, url) await _save_listing(session, top_id, sub_id, items, total_pages) async def _save_listing(session: AsyncSession, top_id: int, sub_id: Optional[int], items: List[str], total_pages: Optional[int]) -> None: res = await session.execute( select(Listing).where(Listing.top_id == top_id, Listing.sub_id == sub_id, Listing.deleted_at.is_(None)) ) listing = res.scalar_one_or_none() if not listing: listing = Listing(top_id=top_id, sub_id=sub_id, total_pages=total_pages) session.add(listing) await session.flush() else: listing.total_pages = total_pages # Normalize and deduplicate incoming slugs seen: set[str] = set() deduped: list[str] = [] for s in items or []: if s and isinstance(s, str) and s not in seen: seen.add(s) deduped.append(s) if not deduped: return # Fetch existing slugs from the database res = await session.execute( select(ListingItem.slug) .where(ListingItem.listing_id == listing.id, ListingItem.deleted_at.is_(None)) ) existing_slugs = set(res.scalars().all()) now = datetime.utcnow() # Slugs to delete (present in DB but not in the new data) to_delete = existing_slugs - seen if to_delete: await session.execute( update(ListingItem) .where( ListingItem.listing_id == listing.id, ListingItem.slug.in_(to_delete), ListingItem.deleted_at.is_(None) ) .values(deleted_at=now) ) # Slugs to insert (new ones not in DB) to_insert = seen - existing_slugs if to_insert: stmt = pg_insert(ListingItem).values( [{"listing_id": listing.id, "slug": s} for s in to_insert] ) #.on_conflict_do_nothing( # constraint="uq_listing_items_listing_slug" #) await session.execute(stmt) async def _nav_ids_from_list_url(session: AsyncSession, list_url: str) -> Tuple[int, Optional[int]]: parts = [x for x in (urlparse(list_url).path or "").split("/") if x] top_slug = parts[0].lower() if parts else "" sub_slug = None if len(parts) >= 2: sub_slug = parts[1] if sub_slug.lower().endswith((".html", ".htm")): sub_slug = re.sub(r"\\.(html?|HTML?)$", "", sub_slug) return await _get_nav_ids(session, top_slug, sub_slug) async def _get_nav_ids(session: AsyncSession, top_slug: str, sub_slug: Optional[str]) -> Tuple[int, Optional[int]]: res_top = await session.execute(select(NavTop.id).where(NavTop.slug == top_slug, NavTop.deleted_at.is_(None))) top_id = res_top.scalar_one_or_none() if not top_id: raise ValueError(f"NavTop not found for slug: {top_slug}") sub_id = None if sub_slug: res_sub = await session.execute( select(NavSub.id).where(NavSub.slug == sub_slug, NavSub.top_id == top_id, NavSub.deleted_at.is_(None)) ) sub_id = res_sub.scalar_one_or_none() if sub_id is None: raise ValueError(f"NavSub not found for slug: {sub_slug} under top_id={top_id}") return top_id, sub_id