Remove cross-DB relationships (CartItem.product, CartItem.market_place, OrderItem.product) that break with per-service databases. Denormalize product and marketplace fields onto cart_items/order_items at write time. - Add AP internal inbox infrastructure (shared/infrastructure/internal_inbox*) for synchronous inter-service writes via HMAC-authenticated POST - Cart inbox blueprint handles Add/Remove/Update rose:CartItem activities - Market app sends AP activities to cart inbox instead of writing CartItem directly - Cart services use denormalized columns instead of cross-DB hydration/joins - Add marketplaces-by-ids data endpoint to market service - Alembic migration adds denormalized columns to cart_items and order_items - Add OAuth device flow auth to market scraper persist_api (artdag client pattern) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
267 lines
8.3 KiB
Python
267 lines
8.3 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Optional
|
|
from urllib.parse import urlencode
|
|
|
|
from types import SimpleNamespace
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import selectinload
|
|
|
|
from shared.models.market import CartItem
|
|
from shared.models.order import Order, OrderItem
|
|
from shared.config import config
|
|
from shared.contracts.dtos import CalendarEntryDTO
|
|
from shared.events import emit_activity
|
|
from shared.infrastructure.actions import call_action
|
|
from shared.infrastructure.data_client import fetch_data
|
|
|
|
|
|
async def find_or_create_cart_item(
|
|
session: AsyncSession,
|
|
product_id: int,
|
|
user_id: Optional[int],
|
|
session_id: Optional[str],
|
|
*,
|
|
product_title: str | None = None,
|
|
product_slug: str | None = None,
|
|
product_image: str | None = None,
|
|
product_brand: str | None = None,
|
|
product_regular_price: float | None = None,
|
|
product_special_price: float | None = None,
|
|
product_price_currency: str | None = None,
|
|
market_place_id: int | None = None,
|
|
market_place_name: str | None = None,
|
|
market_place_container_id: int | None = None,
|
|
) -> Optional[CartItem]:
|
|
"""
|
|
Find an existing cart item for this product/identity, or create a new one.
|
|
Returns None if product data is missing.
|
|
Increments quantity if item already exists.
|
|
"""
|
|
if not product_id:
|
|
return None
|
|
|
|
# Look for existing cart item
|
|
filters = [
|
|
CartItem.deleted_at.is_(None),
|
|
CartItem.product_id == product_id,
|
|
]
|
|
if user_id is not None:
|
|
filters.append(CartItem.user_id == user_id)
|
|
else:
|
|
filters.append(CartItem.session_id == session_id)
|
|
|
|
existing = await session.scalar(select(CartItem).where(*filters))
|
|
|
|
if existing:
|
|
existing.quantity += 1
|
|
return existing
|
|
else:
|
|
cart_item = CartItem(
|
|
user_id=user_id,
|
|
session_id=session_id,
|
|
product_id=product_id,
|
|
quantity=1,
|
|
market_place_id=market_place_id,
|
|
product_title=product_title,
|
|
product_slug=product_slug,
|
|
product_image=product_image,
|
|
product_brand=product_brand,
|
|
product_regular_price=product_regular_price,
|
|
product_special_price=product_special_price,
|
|
product_price_currency=product_price_currency,
|
|
market_place_name=market_place_name,
|
|
market_place_container_id=market_place_container_id,
|
|
)
|
|
session.add(cart_item)
|
|
return cart_item
|
|
|
|
|
|
async def resolve_page_config(
|
|
session: AsyncSession,
|
|
cart: list[CartItem],
|
|
calendar_entries: list[CalendarEntryDTO],
|
|
tickets=None,
|
|
) -> Optional[SimpleNamespace]:
|
|
"""Determine the PageConfig for this order.
|
|
|
|
Returns a PageConfig namespace or None (use global credentials).
|
|
Raises ValueError if items span multiple pages.
|
|
"""
|
|
post_ids: set[int] = set()
|
|
|
|
# From cart items via denormalized market_place_container_id
|
|
for ci in cart:
|
|
if ci.market_place_container_id:
|
|
post_ids.add(ci.market_place_container_id)
|
|
|
|
# From calendar entries via calendar
|
|
for entry in calendar_entries:
|
|
if entry.calendar_container_id:
|
|
post_ids.add(entry.calendar_container_id)
|
|
|
|
# From tickets via calendar_container_id
|
|
for tk in (tickets or []):
|
|
if tk.calendar_container_id:
|
|
post_ids.add(tk.calendar_container_id)
|
|
|
|
if len(post_ids) > 1:
|
|
raise ValueError("Cannot checkout items from multiple pages")
|
|
|
|
if not post_ids:
|
|
return None # global credentials
|
|
|
|
post_id = post_ids.pop()
|
|
raw_pc = await fetch_data(
|
|
"blog", "page-config",
|
|
params={"container_type": "page", "container_id": post_id},
|
|
required=False,
|
|
)
|
|
return SimpleNamespace(**raw_pc) if raw_pc else None
|
|
|
|
|
|
async def create_order_from_cart(
|
|
session: AsyncSession,
|
|
cart: list[CartItem],
|
|
calendar_entries: list[CalendarEntryDTO],
|
|
user_id: Optional[int],
|
|
session_id: Optional[str],
|
|
product_total: float,
|
|
calendar_total: float,
|
|
*,
|
|
ticket_total: float = 0,
|
|
page_post_id: int | None = None,
|
|
) -> Order:
|
|
"""
|
|
Create an Order and OrderItems from the current cart + calendar entries + tickets.
|
|
|
|
When *page_post_id* is given, only calendar entries/tickets whose calendar
|
|
belongs to that page are marked as "ordered". Otherwise all pending
|
|
entries are updated (legacy behaviour).
|
|
"""
|
|
cart_total = product_total + calendar_total + ticket_total
|
|
|
|
# Determine currency from first product
|
|
currency = (cart[0].product_price_currency if cart else None) or "GBP"
|
|
|
|
# Create order
|
|
order = Order(
|
|
user_id=user_id,
|
|
session_id=session_id,
|
|
status="pending",
|
|
currency=currency,
|
|
total_amount=cart_total,
|
|
)
|
|
session.add(order)
|
|
await session.flush()
|
|
|
|
# Create order items from cart
|
|
for ci in cart:
|
|
price = ci.product_special_price or ci.product_regular_price or 0
|
|
oi = OrderItem(
|
|
order=order,
|
|
product_id=ci.product_id,
|
|
product_title=ci.product_title,
|
|
product_slug=ci.product_slug,
|
|
product_image=ci.product_image,
|
|
quantity=ci.quantity,
|
|
unit_price=price,
|
|
currency=currency,
|
|
)
|
|
session.add(oi)
|
|
|
|
# Mark pending calendar entries as "ordered" via events action endpoint
|
|
await call_action("events", "claim-entries-for-order", payload={
|
|
"order_id": order.id, "user_id": user_id,
|
|
"session_id": session_id, "page_post_id": page_post_id,
|
|
})
|
|
|
|
# Claim reserved tickets for this order
|
|
await call_action("events", "claim-tickets-for-order", payload={
|
|
"order_id": order.id, "user_id": user_id,
|
|
"session_id": session_id, "page_post_id": page_post_id,
|
|
})
|
|
|
|
await emit_activity(
|
|
session,
|
|
activity_type="Create",
|
|
actor_uri="internal:cart",
|
|
object_type="rose:Order",
|
|
object_data={
|
|
"order_id": order.id,
|
|
"user_id": user_id,
|
|
"session_id": session_id,
|
|
},
|
|
source_type="order",
|
|
source_id=order.id,
|
|
)
|
|
|
|
return order
|
|
|
|
|
|
def build_sumup_description(cart: list[CartItem], order_id: int, *, ticket_count: int = 0) -> str:
|
|
"""Build a human-readable description for SumUp checkout."""
|
|
titles = [ci.product_title for ci in cart if ci.product_title]
|
|
item_count = sum(ci.quantity for ci in cart)
|
|
|
|
parts = []
|
|
if titles:
|
|
if len(titles) <= 3:
|
|
parts.append(", ".join(titles))
|
|
else:
|
|
parts.append(", ".join(titles[:3]) + f" + {len(titles) - 3} more")
|
|
if ticket_count:
|
|
parts.append(f"{ticket_count} ticket{'s' if ticket_count != 1 else ''}")
|
|
|
|
summary = ", ".join(parts) if parts else "order items"
|
|
total_count = item_count + ticket_count
|
|
|
|
return f"Order {order_id} ({total_count} item{'s' if total_count != 1 else ''}): {summary}"
|
|
|
|
|
|
def build_sumup_reference(order_id: int, page_config=None) -> str:
|
|
"""Build a SumUp reference with configured prefix."""
|
|
if page_config and page_config.sumup_checkout_prefix:
|
|
prefix = page_config.sumup_checkout_prefix
|
|
else:
|
|
sumup_cfg = config().get("sumup", {}) or {}
|
|
prefix = sumup_cfg.get("checkout_reference_prefix", "")
|
|
return f"{prefix}{order_id}"
|
|
|
|
|
|
def build_webhook_url(base_url: str) -> str:
|
|
"""Add webhook secret token to URL if configured."""
|
|
sumup_cfg = config().get("sumup", {}) or {}
|
|
webhook_secret = sumup_cfg.get("webhook_secret")
|
|
|
|
if webhook_secret:
|
|
sep = "&" if "?" in base_url else "?"
|
|
return f"{base_url}{sep}{urlencode({'token': webhook_secret})}"
|
|
|
|
return base_url
|
|
|
|
|
|
def validate_webhook_secret(token: Optional[str]) -> bool:
|
|
"""Validate webhook token against configured secret."""
|
|
sumup_cfg = config().get("sumup", {}) or {}
|
|
webhook_secret = sumup_cfg.get("webhook_secret")
|
|
|
|
if not webhook_secret:
|
|
return True # No secret configured, allow all
|
|
|
|
return token is not None and token == webhook_secret
|
|
|
|
|
|
async def get_order_with_details(session: AsyncSession, order_id: int) -> Optional[Order]:
|
|
"""Fetch an order with items eagerly loaded."""
|
|
result = await session.execute(
|
|
select(Order)
|
|
.options(
|
|
selectinload(Order.items),
|
|
)
|
|
.where(Order.id == order_id)
|
|
)
|
|
return result.scalar_one_or_none()
|