"""SQL-backed CartService implementation. Queries ``shared.models.market.CartItem`` — only this module may write to cart-domain tables on behalf of other domains. """ from __future__ import annotations from decimal import Decimal from sqlalchemy import select, update, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from shared.models.market import CartItem from shared.models.market_place import MarketPlace from shared.contracts.dtos import CartItemDTO, CartSummaryDTO def _item_to_dto(ci: CartItem) -> CartItemDTO: product = ci.product return CartItemDTO( id=ci.id, product_id=ci.product_id, quantity=ci.quantity, product_title=product.title if product else None, product_slug=product.slug if product else None, product_image=product.image if product else None, unit_price=Decimal(str(product.special_price or product.regular_price or 0)) if product else None, market_place_id=ci.market_place_id, ) class SqlCartService: async def cart_summary( self, session: AsyncSession, *, user_id: int | None, session_id: str | None, page_slug: str | None = None, ) -> CartSummaryDTO: """Build a lightweight cart summary for the current identity.""" from shared.infrastructure.data_client import fetch_data from shared.contracts.dtos import CalendarEntryDTO, TicketDTO, dto_from_dict # Resolve page filter via blog data endpoint page_post_id: int | None = None if page_slug: post = await fetch_data("blog", "post-by-slug", params={"slug": page_slug}, required=False) if post and post.get("is_page"): page_post_id = post["id"] # --- product cart --- cart_q = select(CartItem).where(CartItem.deleted_at.is_(None)) if user_id is not None: cart_q = cart_q.where(CartItem.user_id == user_id) elif session_id is not None: cart_q = cart_q.where(CartItem.session_id == session_id) else: return CartSummaryDTO() if page_post_id is not None: mp_ids = select(MarketPlace.id).where( MarketPlace.container_type == "page", MarketPlace.container_id == page_post_id, MarketPlace.deleted_at.is_(None), ).scalar_subquery() cart_q = cart_q.where(CartItem.market_place_id.in_(mp_ids)) cart_q = cart_q.options(selectinload(CartItem.product)) result = await session.execute(cart_q) cart_items = result.scalars().all() count = sum(ci.quantity for ci in cart_items) total = sum( Decimal(str(ci.product.special_price or ci.product.regular_price or 0)) * ci.quantity for ci in cart_items if ci.product and (ci.product.special_price or ci.product.regular_price) ) # --- calendar entries via events data endpoint --- cal_params: dict = {} if user_id is not None: cal_params["user_id"] = user_id if session_id is not None: cal_params["session_id"] = session_id if page_post_id is not None: cal_params["page_id"] = page_post_id raw_entries = await fetch_data("events", "entries-for-page", params=cal_params, required=False) or [] else: raw_entries = await fetch_data("events", "pending-entries", params=cal_params, required=False) or [] cal_entries = [dto_from_dict(CalendarEntryDTO, e) for e in raw_entries] calendar_count = len(cal_entries) calendar_total = sum(Decimal(str(e.cost or 0)) for e in cal_entries if e.cost is not None) # --- tickets via events data endpoint --- if page_post_id is not None: raw_tickets = await fetch_data("events", "tickets-for-page", params=cal_params, required=False) or [] else: tk_params = {k: v for k, v in cal_params.items() if k != "page_id"} raw_tickets = await fetch_data("events", "pending-tickets", params=tk_params, required=False) or [] tickets = [dto_from_dict(TicketDTO, t) for t in raw_tickets] ticket_count = len(tickets) ticket_total = sum(Decimal(str(t.price or 0)) for t in tickets) items = [_item_to_dto(ci) for ci in cart_items] return CartSummaryDTO( count=count, total=total, calendar_count=calendar_count, calendar_total=calendar_total, items=items, ticket_count=ticket_count, ticket_total=ticket_total, ) async def cart_items( self, session: AsyncSession, *, user_id: int | None, session_id: str | None, ) -> list[CartItemDTO]: cart_q = select(CartItem).where(CartItem.deleted_at.is_(None)) if user_id is not None: cart_q = cart_q.where(CartItem.user_id == user_id) elif session_id is not None: cart_q = cart_q.where(CartItem.session_id == session_id) else: return [] cart_q = cart_q.options(selectinload(CartItem.product)).order_by(CartItem.created_at.desc()) result = await session.execute(cart_q) return [_item_to_dto(ci) for ci in result.scalars().all()] async def adopt_cart_for_user( self, session: AsyncSession, user_id: int, session_id: str, ) -> None: """Adopt anonymous cart items for a logged-in user.""" anon_result = await session.execute( select(CartItem).where( CartItem.deleted_at.is_(None), CartItem.user_id.is_(None), CartItem.session_id == session_id, ) ) anon_items = anon_result.scalars().all() if anon_items: # Soft-delete existing user cart await session.execute( update(CartItem) .where(CartItem.deleted_at.is_(None), CartItem.user_id == user_id) .values(deleted_at=func.now()) ) for ci in anon_items: ci.user_id = user_id