"""SQL-backed CartService implementation. Queries ``shared.models.market.CartItem`` — only this module may write to cart-domain tables on behalf of other domains. """ from __future__ import annotations from decimal import Decimal from sqlalchemy import select, update, func from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload from shared.models.market import CartItem from shared.models.market_place import MarketPlace from shared.models.calendars import CalendarEntry, Calendar from shared.contracts.dtos import CartItemDTO, CartSummaryDTO def _item_to_dto(ci: CartItem) -> CartItemDTO: product = ci.product return CartItemDTO( id=ci.id, product_id=ci.product_id, quantity=ci.quantity, product_title=product.title if product else None, product_slug=product.slug if product else None, product_image=product.image if product else None, unit_price=Decimal(str(product.special_price or product.regular_price or 0)) if product else None, market_place_id=ci.market_place_id, ) class SqlCartService: async def cart_summary( self, session: AsyncSession, *, user_id: int | None, session_id: str | None, page_slug: str | None = None, ) -> CartSummaryDTO: """Build a lightweight cart summary for the current identity.""" # Resolve page filter page_post_id: int | None = None if page_slug: from shared.services.registry import services post = await services.blog.get_post_by_slug(session, page_slug) if post and post.is_page: page_post_id = post.id # --- product cart --- cart_q = select(CartItem).where(CartItem.deleted_at.is_(None)) if user_id is not None: cart_q = cart_q.where(CartItem.user_id == user_id) elif session_id is not None: cart_q = cart_q.where(CartItem.session_id == session_id) else: return CartSummaryDTO() if page_post_id is not None: mp_ids = select(MarketPlace.id).where( MarketPlace.container_type == "page", MarketPlace.container_id == page_post_id, MarketPlace.deleted_at.is_(None), ).scalar_subquery() cart_q = cart_q.where(CartItem.market_place_id.in_(mp_ids)) cart_q = cart_q.options(selectinload(CartItem.product)) result = await session.execute(cart_q) cart_items = result.scalars().all() count = sum(ci.quantity for ci in cart_items) total = sum( Decimal(str(ci.product.special_price or ci.product.regular_price or 0)) * ci.quantity for ci in cart_items if ci.product and (ci.product.special_price or ci.product.regular_price) ) # --- calendar entries --- from shared.services.registry import services if page_post_id is not None: cal_entries = await services.calendar.entries_for_page( session, page_post_id, user_id=user_id, session_id=session_id, ) else: cal_entries = await services.calendar.pending_entries( session, user_id=user_id, session_id=session_id, ) calendar_count = len(cal_entries) calendar_total = sum(Decimal(str(e.cost or 0)) for e in cal_entries if e.cost is not None) # --- tickets --- if page_post_id is not None: tickets = await services.calendar.tickets_for_page( session, page_post_id, user_id=user_id, session_id=session_id, ) else: tickets = await services.calendar.pending_tickets( session, user_id=user_id, session_id=session_id, ) ticket_count = len(tickets) ticket_total = sum(Decimal(str(t.price or 0)) for t in tickets) items = [_item_to_dto(ci) for ci in cart_items] return CartSummaryDTO( count=count, total=total, calendar_count=calendar_count, calendar_total=calendar_total, items=items, ticket_count=ticket_count, ticket_total=ticket_total, ) async def cart_items( self, session: AsyncSession, *, user_id: int | None, session_id: str | None, ) -> list[CartItemDTO]: cart_q = select(CartItem).where(CartItem.deleted_at.is_(None)) if user_id is not None: cart_q = cart_q.where(CartItem.user_id == user_id) elif session_id is not None: cart_q = cart_q.where(CartItem.session_id == session_id) else: return [] cart_q = cart_q.options(selectinload(CartItem.product)).order_by(CartItem.created_at.desc()) result = await session.execute(cart_q) return [_item_to_dto(ci) for ci in result.scalars().all()] async def adopt_cart_for_user( self, session: AsyncSession, user_id: int, session_id: str, ) -> None: """Adopt anonymous cart items for a logged-in user.""" anon_result = await session.execute( select(CartItem).where( CartItem.deleted_at.is_(None), CartItem.user_id.is_(None), CartItem.session_id == session_id, ) ) anon_items = anon_result.scalars().all() if anon_items: # Soft-delete existing user cart await session.execute( update(CartItem) .where(CartItem.deleted_at.is_(None), CartItem.user_id == user_id) .values(deleted_at=func.now()) ) for ci in anon_items: ci.user_id = user_id