diff --git a/CLAUDE.md b/CLAUDE.md index afb00d7..a9588b1 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -2,6 +2,10 @@ Federated content-addressed DAG execution engine for distributed media processing with ActivityPub ownership and provenance tracking. +## Deployment + +- **Do NOT push** until explicitly told to. Pushes reload code to dev automatically. + ## Project Structure ``` @@ -54,6 +58,9 @@ docker build -f l2/Dockerfile -t l2-server:latest . - **Celery Tasks**: In `l1/tasks/`, decorated with `@app.task` - **S-Expression Effects**: Composable effect language in `l1/sexp_effects/` - **Storage**: Local filesystem, S3, or IPFS backends (`storage_providers.py`) +- **Inter-Service Reads**: `fetch_data()` → GET `/internal/data/{query}` (HMAC-signed) +- **Inter-Service Actions**: `call_action()` → POST `/internal/actions/{name}` (HMAC-signed) +- **Inter-Service AP Inbox**: `send_internal_activity()` → POST `/internal/inbox` (HMAC-signed, AP-shaped activities for cross-service writes with denormalized data) ## Auth diff --git a/cart/alembic/versions/0002_denormalize_product_data.py b/cart/alembic/versions/0002_denormalize_product_data.py new file mode 100644 index 0000000..d9396b0 --- /dev/null +++ b/cart/alembic/versions/0002_denormalize_product_data.py @@ -0,0 +1,48 @@ +"""Denormalize product and marketplace data onto cart_items and order_items. + +Revision ID: cart_0002 +Revises: cart_0001 +Create Date: 2026-02-26 +""" + +import sqlalchemy as sa +from alembic import op + +revision = "cart_0002" +down_revision = "cart_0001" +branch_labels = None +depends_on = None + + +def upgrade(): + # -- cart_items: denormalized product data -- + op.add_column("cart_items", sa.Column("product_title", sa.String(512), nullable=True)) + op.add_column("cart_items", sa.Column("product_slug", sa.String(512), nullable=True)) + op.add_column("cart_items", sa.Column("product_image", sa.Text, nullable=True)) + op.add_column("cart_items", sa.Column("product_brand", sa.String(255), nullable=True)) + op.add_column("cart_items", sa.Column("product_regular_price", sa.Numeric(12, 2), nullable=True)) + op.add_column("cart_items", sa.Column("product_special_price", sa.Numeric(12, 2), nullable=True)) + op.add_column("cart_items", sa.Column("product_price_currency", sa.String(16), nullable=True)) + + # -- cart_items: denormalized marketplace data -- + op.add_column("cart_items", sa.Column("market_place_name", sa.String(255), nullable=True)) + op.add_column("cart_items", sa.Column("market_place_container_id", sa.Integer, nullable=True)) + + # -- order_items: denormalized product fields -- + op.add_column("order_items", sa.Column("product_slug", sa.String(512), nullable=True)) + op.add_column("order_items", sa.Column("product_image", sa.Text, nullable=True)) + + +def downgrade(): + op.drop_column("order_items", "product_image") + op.drop_column("order_items", "product_slug") + + op.drop_column("cart_items", "market_place_container_id") + op.drop_column("cart_items", "market_place_name") + op.drop_column("cart_items", "product_price_currency") + op.drop_column("cart_items", "product_special_price") + op.drop_column("cart_items", "product_regular_price") + op.drop_column("cart_items", "product_brand") + op.drop_column("cart_items", "product_image") + op.drop_column("cart_items", "product_slug") + op.drop_column("cart_items", "product_title") diff --git a/cart/app.py b/cart/app.py index f02ffc3..8643f7b 100644 --- a/cart/app.py +++ b/cart/app.py @@ -19,6 +19,7 @@ from bp import ( register_fragments, register_actions, register_data, + register_inbox, ) from bp.cart.services import ( get_cart, @@ -143,6 +144,7 @@ def create_app() -> "Quart": app.register_blueprint(register_fragments()) app.register_blueprint(register_actions()) app.register_blueprint(register_data()) + app.register_blueprint(register_inbox()) # --- Page slug hydration (follows events/market app pattern) --- diff --git a/cart/bp/__init__.py b/cart/bp/__init__.py index 43a64c1..72d4f84 100644 --- a/cart/bp/__init__.py +++ b/cart/bp/__init__.py @@ -6,3 +6,4 @@ from .orders.routes import register as register_orders from .fragments import register_fragments from .actions import register_actions from .data import register_data +from .inbox import register_inbox diff --git a/cart/bp/cart/global_routes.py b/cart/bp/cart/global_routes.py index ad528af..dd24665 100644 --- a/cart/bp/cart/global_routes.py +++ b/cart/bp/cart/global_routes.py @@ -7,7 +7,6 @@ from sqlalchemy import select from shared.models.market import CartItem from shared.models.order import Order -from shared.models.market_place import MarketPlace from shared.infrastructure.actions import call_action from .services import ( current_cart_identity, @@ -265,16 +264,14 @@ def register(url_prefix: str) -> Blueprint: required=False) if raw_pc else None if post: g.page_slug = post["slug"] - result = await g.s.execute( - select(MarketPlace).where( - MarketPlace.container_type == "page", - MarketPlace.container_id == post["id"], - MarketPlace.deleted_at.is_(None), - ).limit(1) - ) - mp = result.scalar_one_or_none() - if mp: - g.market_slug = mp.slug + # Fetch marketplace slug from market service + mps = await fetch_data( + "market", "marketplaces-for-container", + params={"type": "page", "id": post["id"]}, + required=False, + ) or [] + if mps: + g.market_slug = mps[0].get("slug") if order.sumup_checkout_id: try: diff --git a/cart/bp/cart/services/checkout.py b/cart/bp/cart/services/checkout.py index 7d9f3e9..1063175 100644 --- a/cart/bp/cart/services/checkout.py +++ b/cart/bp/cart/services/checkout.py @@ -9,9 +9,8 @@ from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import selectinload -from shared.models.market import Product, CartItem +from shared.models.market import CartItem from shared.models.order import Order, OrderItem -from shared.models.market_place import MarketPlace from shared.config import config from shared.contracts.dtos import CalendarEntryDTO from shared.events import emit_activity @@ -24,17 +23,24 @@ async def find_or_create_cart_item( product_id: int, user_id: Optional[int], session_id: Optional[str], + *, + product_title: str | None = None, + product_slug: str | None = None, + product_image: str | None = None, + product_brand: str | None = None, + product_regular_price: float | None = None, + product_special_price: float | None = None, + product_price_currency: str | None = None, + market_place_id: int | None = None, + market_place_name: str | None = None, + market_place_container_id: int | None = None, ) -> Optional[CartItem]: """ Find an existing cart item for this product/identity, or create a new one. - Returns None if the product doesn't exist. + Returns None if product data is missing. Increments quantity if item already exists. """ - # Make sure product exists - product = await session.scalar( - select(Product).where(Product.id == product_id) - ) - if not product: + if not product_id: return None # Look for existing cart item @@ -56,8 +62,18 @@ async def find_or_create_cart_item( cart_item = CartItem( user_id=user_id, session_id=session_id, - product_id=product.id, + product_id=product_id, quantity=1, + market_place_id=market_place_id, + product_title=product_title, + product_slug=product_slug, + product_image=product_image, + product_brand=product_brand, + product_regular_price=product_regular_price, + product_special_price=product_special_price, + product_price_currency=product_price_currency, + market_place_name=market_place_name, + market_place_container_id=market_place_container_id, ) session.add(cart_item) return cart_item @@ -76,12 +92,10 @@ async def resolve_page_config( """ post_ids: set[int] = set() - # From cart items via market_place + # From cart items via denormalized market_place_container_id for ci in cart: - if ci.market_place_id: - mp = await session.get(MarketPlace, ci.market_place_id) - if mp: - post_ids.add(mp.container_id) + if ci.market_place_container_id: + post_ids.add(ci.market_place_container_id) # From calendar entries via calendar for entry in calendar_entries: @@ -130,8 +144,7 @@ async def create_order_from_cart( cart_total = product_total + calendar_total + ticket_total # Determine currency from first product - first_product = cart[0].product if cart else None - currency = (first_product.regular_price_currency if first_product else None) or "GBP" + currency = (cart[0].product_price_currency if cart else None) or "GBP" # Create order order = Order( @@ -146,11 +159,13 @@ async def create_order_from_cart( # Create order items from cart for ci in cart: - price = ci.product.special_price or ci.product.regular_price or 0 + price = ci.product_special_price or ci.product_regular_price or 0 oi = OrderItem( order=order, - product_id=ci.product.id, - product_title=ci.product.title, + product_id=ci.product_id, + product_title=ci.product_title, + product_slug=ci.product_slug, + product_image=ci.product_image, quantity=ci.quantity, unit_price=price, currency=currency, @@ -188,7 +203,7 @@ async def create_order_from_cart( def build_sumup_description(cart: list[CartItem], order_id: int, *, ticket_count: int = 0) -> str: """Build a human-readable description for SumUp checkout.""" - titles = [ci.product.title for ci in cart if ci.product and ci.product.title] + titles = [ci.product_title for ci in cart if ci.product_title] item_count = sum(ci.quantity for ci in cart) parts = [] @@ -240,11 +255,11 @@ def validate_webhook_secret(token: Optional[str]) -> bool: async def get_order_with_details(session: AsyncSession, order_id: int) -> Optional[Order]: - """Fetch an order with items and calendar entries eagerly loaded.""" + """Fetch an order with items eagerly loaded.""" result = await session.execute( select(Order) .options( - selectinload(Order.items).selectinload(OrderItem.product), + selectinload(Order.items), ) .where(Order.id == order_id) ) diff --git a/cart/bp/cart/services/clear_cart_for_order.py b/cart/bp/cart/services/clear_cart_for_order.py index 3643839..11736ee 100644 --- a/cart/bp/cart/services/clear_cart_for_order.py +++ b/cart/bp/cart/services/clear_cart_for_order.py @@ -1,7 +1,6 @@ -from sqlalchemy import update, func, select +from sqlalchemy import update, func from shared.models.market import CartItem -from shared.models.market_place import MarketPlace from shared.models.order import Order @@ -23,12 +22,7 @@ async def clear_cart_for_order(session, order: Order, *, page_post_id: int | Non return if page_post_id is not None: - mp_ids = select(MarketPlace.id).where( - MarketPlace.container_type == "page", - MarketPlace.container_id == page_post_id, - MarketPlace.deleted_at.is_(None), - ).scalar_subquery() - filters.append(CartItem.market_place_id.in_(mp_ids)) + filters.append(CartItem.market_place_container_id == page_post_id) await session.execute( update(CartItem) diff --git a/cart/bp/cart/services/get_cart.py b/cart/bp/cart/services/get_cart.py index ad1c0ce..ef50025 100644 --- a/cart/bp/cart/services/get_cart.py +++ b/cart/bp/cart/services/get_cart.py @@ -1,25 +1,55 @@ +from types import SimpleNamespace + from sqlalchemy import select -from sqlalchemy.orm import selectinload from shared.models.market import CartItem from .identity import current_cart_identity -async def get_cart(session): - ident = current_cart_identity() - - filters = [CartItem.deleted_at.is_(None)] - if ident["user_id"] is not None: - filters.append(CartItem.user_id == ident["user_id"]) - else: - filters.append(CartItem.session_id == ident["session_id"]) - result = await session.execute( - select(CartItem) - .where(*filters) - .order_by(CartItem.created_at.desc()) - .options( - selectinload(CartItem.product), - selectinload(CartItem.market_place), - ) - ) - return result.scalars().all() +def _attach_product_namespace(ci: CartItem) -> None: + """Build a SimpleNamespace 'product' from denormalized columns for template compat.""" + ci.product = SimpleNamespace( + id=ci.product_id, + title=ci.product_title, + slug=ci.product_slug, + image=ci.product_image, + brand=ci.product_brand, + regular_price=ci.product_regular_price, + special_price=ci.product_special_price, + regular_price_currency=ci.product_price_currency, + ) + + +def _attach_market_place_namespace(ci: CartItem) -> None: + """Build a SimpleNamespace 'market_place' from denormalized columns.""" + if ci.market_place_id: + ci.market_place = SimpleNamespace( + id=ci.market_place_id, + name=ci.market_place_name, + container_id=ci.market_place_container_id, + ) + else: + ci.market_place = None + + +async def get_cart(session): + ident = current_cart_identity() + + filters = [CartItem.deleted_at.is_(None)] + if ident["user_id"] is not None: + filters.append(CartItem.user_id == ident["user_id"]) + else: + filters.append(CartItem.session_id == ident["session_id"]) + + result = await session.execute( + select(CartItem) + .where(*filters) + .order_by(CartItem.created_at.desc()) + ) + items = list(result.scalars().all()) + + for ci in items: + _attach_product_namespace(ci) + _attach_market_place_namespace(ci) + + return items diff --git a/cart/bp/cart/services/page_cart.py b/cart/bp/cart/services/page_cart.py index afb92cd..cf20f8a 100644 --- a/cart/bp/cart/services/page_cart.py +++ b/cart/bp/cart/services/page_cart.py @@ -2,7 +2,7 @@ Page-scoped cart queries. Groups cart items and calendar entries by their owning page (Post), -determined via CartItem.market_place.container_id and CalendarEntry.calendar.container_id +determined via CartItem.market_place_container_id (where container_type == "page"). """ from __future__ import annotations @@ -12,24 +12,21 @@ from collections import defaultdict from types import SimpleNamespace from sqlalchemy import select -from sqlalchemy.orm import selectinload from shared.models.market import CartItem -from shared.models.market_place import MarketPlace from shared.infrastructure.data_client import fetch_data from shared.contracts.dtos import CalendarEntryDTO, TicketDTO, PostDTO, dto_from_dict from .identity import current_cart_identity +from .get_cart import _attach_product_namespace, _attach_market_place_namespace async def get_cart_for_page(session, post_id: int) -> list[CartItem]: - """Return cart items scoped to a specific page (via MarketPlace.container_id).""" + """Return cart items scoped to a specific page (via denormalized market_place_container_id).""" ident = current_cart_identity() filters = [ CartItem.deleted_at.is_(None), - MarketPlace.container_type == "page", - MarketPlace.container_id == post_id, - MarketPlace.deleted_at.is_(None), + CartItem.market_place_container_id == post_id, ] if ident["user_id"] is not None: filters.append(CartItem.user_id == ident["user_id"]) @@ -38,15 +35,16 @@ async def get_cart_for_page(session, post_id: int) -> list[CartItem]: result = await session.execute( select(CartItem) - .join(MarketPlace, CartItem.market_place_id == MarketPlace.id) .where(*filters) .order_by(CartItem.created_at.desc()) - .options( - selectinload(CartItem.product), - selectinload(CartItem.market_place), - ) ) - return result.scalars().all() + items = list(result.scalars().all()) + + for ci in items: + _attach_product_namespace(ci) + _attach_market_place_namespace(ci) + + return items async def get_calendar_entries_for_page(session, post_id: int): diff --git a/cart/bp/cart/services/total.py b/cart/bp/cart/services/total.py index 8dcdaf9..151fa06 100644 --- a/cart/bp/cart/services/total.py +++ b/cart/bp/cart/services/total.py @@ -4,10 +4,9 @@ from decimal import Decimal def total(cart): return sum( ( - Decimal(str(item.product.special_price or item.product.regular_price)) + Decimal(str(item.product_special_price or item.product_regular_price)) * item.quantity ) for item in cart - if (item.product.special_price or item.product.regular_price) is not None + if (item.product_special_price or item.product_regular_price) is not None ) - \ No newline at end of file diff --git a/cart/bp/data/routes.py b/cart/bp/data/routes.py index af5fce7..d401c15 100644 --- a/cart/bp/data/routes.py +++ b/cart/bp/data/routes.py @@ -83,7 +83,6 @@ def register() -> Blueprint: # --- cart-items (product slugs + quantities for template rendering) --- async def _cart_items(): from sqlalchemy import select - from sqlalchemy.orm import selectinload from shared.models.market import CartItem user_id = request.args.get("user_id", type=int) @@ -98,13 +97,13 @@ def register() -> Blueprint: return [] result = await g.s.execute( - select(CartItem).where(*filters).options(selectinload(CartItem.product)) + select(CartItem).where(*filters) ) items = result.scalars().all() return [ { "product_id": item.product_id, - "product_slug": item.product.slug if item.product else None, + "product_slug": item.product_slug, "quantity": item.quantity, } for item in items diff --git a/cart/bp/inbox/__init__.py b/cart/bp/inbox/__init__.py new file mode 100644 index 0000000..16e431f --- /dev/null +++ b/cart/bp/inbox/__init__.py @@ -0,0 +1 @@ +from .routes import register as register_inbox diff --git a/cart/bp/inbox/routes.py b/cart/bp/inbox/routes.py new file mode 100644 index 0000000..9fa9b67 --- /dev/null +++ b/cart/bp/inbox/routes.py @@ -0,0 +1,161 @@ +"""Cart internal inbox endpoint. + +Receives AP-shaped activities from other services via HMAC-authenticated +POST to ``/internal/inbox``. Routes to handlers registered via the +internal inbox dispatch infrastructure. +""" +from __future__ import annotations + +import logging +from datetime import datetime, timezone + +from quart import Blueprint, g, jsonify, request +from sqlalchemy import select + +from shared.infrastructure.internal_inbox import dispatch_internal_activity, register_internal_handler +from shared.infrastructure.internal_inbox_client import INBOX_HEADER +from shared.models.market import CartItem + +log = logging.getLogger(__name__) + + +def register() -> Blueprint: + bp = Blueprint("inbox", __name__, url_prefix="/internal/inbox") + + @bp.before_request + async def _require_inbox_header(): + if not request.headers.get(INBOX_HEADER): + return jsonify({"error": "forbidden"}), 403 + from shared.infrastructure.internal_auth import validate_internal_request + if not validate_internal_request(): + return jsonify({"error": "forbidden"}), 403 + + @bp.post("") + async def handle_inbox(): + body = await request.get_json() + if not body: + return jsonify({"error": "empty body"}), 400 + try: + result = await dispatch_internal_activity(g.s, body) + return jsonify(result) + except ValueError as exc: + return jsonify({"error": str(exc)}), 400 + except Exception as exc: + log.exception("Internal inbox dispatch failed") + return jsonify({"error": str(exc)}), 500 + + # --- Handler: Add rose:CartItem --- + async def _handle_add_cart_item(session, body: dict) -> dict: + obj = body["object"] + user_id = obj.get("user_id") + session_id = obj.get("session_id") + product_id = obj["product_id"] + count = obj.get("quantity", 1) + market_place_id = obj.get("market_place_id") + + # Look for existing cart item + filters = [ + CartItem.deleted_at.is_(None), + CartItem.product_id == product_id, + ] + if user_id is not None: + filters.append(CartItem.user_id == user_id) + else: + filters.append(CartItem.session_id == session_id) + + existing = await session.scalar(select(CartItem).where(*filters)) + + if existing: + if count > 0: + existing.quantity = count + else: + existing.deleted_at = datetime.now(timezone.utc) + ci = existing + else: + if count <= 0: + return {"ok": True, "action": "noop"} + ci = CartItem( + user_id=user_id, + session_id=session_id, + product_id=product_id, + quantity=count, + market_place_id=market_place_id, + # Denormalized product data + product_title=obj.get("product_title"), + product_slug=obj.get("product_slug"), + product_image=obj.get("product_image"), + product_brand=obj.get("product_brand"), + product_regular_price=obj.get("product_regular_price"), + product_special_price=obj.get("product_special_price"), + product_price_currency=obj.get("product_price_currency"), + # Denormalized marketplace data + market_place_name=obj.get("market_place_name"), + market_place_container_id=obj.get("market_place_container_id"), + ) + session.add(ci) + + await session.flush() + return { + "ok": True, + "cart_item_id": ci.id, + "quantity": ci.quantity, + } + + register_internal_handler("Add", "rose:CartItem", _handle_add_cart_item) + + # --- Handler: Remove rose:CartItem --- + async def _handle_remove_cart_item(session, body: dict) -> dict: + obj = body["object"] + user_id = obj.get("user_id") + session_id = obj.get("session_id") + product_id = obj["product_id"] + + filters = [ + CartItem.deleted_at.is_(None), + CartItem.product_id == product_id, + ] + if user_id is not None: + filters.append(CartItem.user_id == user_id) + else: + filters.append(CartItem.session_id == session_id) + + existing = await session.scalar(select(CartItem).where(*filters)) + if existing: + existing.deleted_at = datetime.now(timezone.utc) + await session.flush() + + return {"ok": True} + + register_internal_handler("Remove", "rose:CartItem", _handle_remove_cart_item) + + # --- Handler: Update rose:CartItem --- + async def _handle_update_cart_item(session, body: dict) -> dict: + obj = body["object"] + user_id = obj.get("user_id") + session_id = obj.get("session_id") + product_id = obj["product_id"] + quantity = obj.get("quantity", 1) + + filters = [ + CartItem.deleted_at.is_(None), + CartItem.product_id == product_id, + ] + if user_id is not None: + filters.append(CartItem.user_id == user_id) + else: + filters.append(CartItem.session_id == session_id) + + existing = await session.scalar(select(CartItem).where(*filters)) + if existing: + if quantity <= 0: + existing.deleted_at = datetime.now(timezone.utc) + else: + existing.quantity = quantity + await session.flush() + return {"ok": True, "quantity": existing.quantity} + + return {"ok": True, "action": "noop"} + + register_internal_handler("Update", "rose:CartItem", _handle_update_cart_item) + + return bp diff --git a/cart/bp/order/routes.py b/cart/bp/order/routes.py index 5432ce0..2452679 100644 --- a/cart/bp/order/routes.py +++ b/cart/bp/order/routes.py @@ -5,7 +5,6 @@ from sqlalchemy import select, func, or_, cast, String, exists from sqlalchemy.orm import selectinload -from shared.models.market import Product from shared.models.order import Order, OrderItem from shared.browser.app.payments.sumup import create_checkout as sumup_create_checkout from shared.config import config @@ -49,7 +48,7 @@ def register() -> Blueprint: result = await g.s.execute( select(Order) .options( - selectinload(Order.items).selectinload(OrderItem.product) + selectinload(Order.items) ) .where(Order.id == order_id, owner) ) diff --git a/cart/bp/orders/routes.py b/cart/bp/orders/routes.py index 2e1579b..a6fbd8a 100644 --- a/cart/bp/orders/routes.py +++ b/cart/bp/orders/routes.py @@ -5,7 +5,6 @@ from sqlalchemy import select, func, or_, cast, String, exists from sqlalchemy.orm import selectinload -from shared.models.market import Product from shared.models.order import Order, OrderItem from shared.browser.app.payments.sumup import create_checkout as sumup_create_checkout from shared.config import config @@ -86,16 +85,11 @@ def register(url_prefix: str) -> Blueprint: exists( select(1) .select_from(OrderItem) - .join(Product, Product.id == OrderItem.product_id) .where( OrderItem.order_id == Order.id, or_( OrderItem.product_title.ilike(term), - Product.title.ilike(term), - Product.description_short.ilike(term), - Product.description_html.ilike(term), - Product.slug.ilike(term), - Product.brand.ilike(term), + OrderItem.product_slug.ilike(term), ), ) ) diff --git a/cart/templates/_types/order/_items.html b/cart/templates/_types/order/_items.html index 27b2a9f..92d674d 100644 --- a/cart/templates/_types/order/_items.html +++ b/cart/templates/_types/order/_items.html @@ -7,13 +7,13 @@
- {{ item.product_title or (item.product and item.product.title) or 'Unknown product' }} + {{ item.product_title or 'Unknown product' }}
Product ID: {{ item.product_id }} diff --git a/market/bp/data/routes.py b/market/bp/data/routes.py index a1e6bf7..a24b90b 100644 --- a/market/bp/data/routes.py +++ b/market/bp/data/routes.py @@ -76,4 +76,35 @@ def register() -> Blueprint: _handlers["products-by-ids"] = _products_by_ids + # --- marketplaces-by-ids --- + async def _marketplaces_by_ids(): + """Return marketplace data for a list of IDs (comma-separated).""" + from sqlalchemy import select + from shared.models.market_place import MarketPlace + + ids_raw = request.args.get("ids", "") + try: + ids = [int(x) for x in ids_raw.split(",") if x.strip()] + except ValueError: + return {"error": "ids must be comma-separated integers"}, 400 + if not ids: + return [] + + rows = (await g.s.execute( + select(MarketPlace).where(MarketPlace.id.in_(ids)) + )).scalars().all() + + return [ + { + "id": m.id, + "name": m.name, + "slug": m.slug, + "container_type": m.container_type, + "container_id": m.container_id, + } + for m in rows + ] + + _handlers["marketplaces-by-ids"] = _marketplaces_by_ids + return bp diff --git a/market/bp/product/routes.py b/market/bp/product/routes.py index 19e76c4..56d4c72 100644 --- a/market/bp/product/routes.py +++ b/market/bp/product/routes.py @@ -162,29 +162,25 @@ def register(): from bp.cart.services.identity import current_cart_identity - #from bp.cart.routes import view_cart - from models.market import CartItem from quart import request, url_for + from shared.infrastructure.internal_inbox_client import send_internal_activity + from shared.infrastructure.data_client import fetch_data @bp.post("/cart/") @clear_cache(tag="browse", tag_scope="user") async def cart(): slug = g.product_slug - # make sure product exists (we *allow* deleted_at != None later if you want) - product_id = await g.s.scalar( - select(Product.id).where( + # Load product from local db_market + product = await g.s.scalar( + select(Product).where( Product.slug == slug, Product.deleted_at.is_(None), ) ) - - product = await g.s.scalar( - select(Product).where(Product.id == product_id) - ) if not product: return await make_response("Product not found", 404) - # --- NEW: read `count` from body (JSON or form), default to 1 --- + # Read `count` from body (JSON or form), default to 1 count = 1 try: if request.is_json: @@ -196,68 +192,72 @@ def register(): if "count" in form: count = int(form["count"]) except (ValueError, TypeError): - # if parsing fails, just fall back to 1 count = 1 - # --- END NEW --- ident = current_cart_identity() + market = getattr(g, "market", None) - # Load cart items for current user/session - from sqlalchemy.orm import selectinload - cart_filters = [CartItem.deleted_at.is_(None)] - if ident["user_id"] is not None: - cart_filters.append(CartItem.user_id == ident["user_id"]) - else: - cart_filters.append(CartItem.session_id == ident["session_id"]) - cart_result = await g.s.execute( - select(CartItem) - .where(*cart_filters) - .order_by(CartItem.created_at.desc()) - .options( - selectinload(CartItem.product), - selectinload(CartItem.market_place), + # Build AP activity with denormalized product data + activity_type = "Add" if count > 0 else "Remove" + activity = { + "type": activity_type, + "object": { + "type": "rose:CartItem", + "user_id": ident["user_id"], + "session_id": ident["session_id"], + "product_id": product.id, + "quantity": count, + "market_place_id": market.id if market else None, + # Denormalized product data + "product_title": product.title, + "product_slug": product.slug, + "product_image": product.image, + "product_brand": product.brand, + "product_regular_price": str(product.regular_price) if product.regular_price is not None else None, + "product_special_price": str(product.special_price) if product.special_price is not None else None, + "product_price_currency": product.regular_price_currency, + # Denormalized marketplace data + "market_place_name": market.name if market else None, + "market_place_container_id": market.container_id if market else None, + }, + } + + await send_internal_activity("cart", activity) + + # Fetch updated cart items from cart service for template rendering + raw_cart = await fetch_data( + "cart", "cart-items", + params={ + k: v for k, v in { + "user_id": ident["user_id"], + "session_id": ident["session_id"], + }.items() if v is not None + }, + required=False, + ) or [] + + # Build minimal cart list for template (product slug + quantity) + from types import SimpleNamespace + g.cart = [ + SimpleNamespace( + product_id=ci["product_id"], + product=SimpleNamespace(slug=ci["product_slug"]), + quantity=ci["quantity"], ) - ) - g.cart = list(cart_result.scalars().all()) + for ci in raw_cart + ] - ci = next( - (item for item in g.cart if item.product_id == product_id), + ci_ns = next( + (item for item in g.cart if item.product_id == product.id), None, ) - # --- NEW: set quantity based on `count` --- - if ci: - if count > 0: - ci.quantity = count - else: - # count <= 0 → remove from cart entirely - ci.quantity=0 - g.cart.remove(ci) - await g.s.delete(ci) - - else: - if count > 0: - ci = CartItem( - user_id=ident["user_id"], - session_id=ident["session_id"], - product_id=product.id, - product=product, - quantity=count, - market_place_id=getattr(g, "market", None) and g.market.id, - ) - g.cart.append(ci) - g.s.add(ci) - # if count <= 0 and no existing item, do nothing - # --- END NEW --- - - # no explicit commit; your session middleware should handle it - # htmx response: OOB-swap mini cart + product buttons if request.headers.get("HX-Request") == "true": return await render_template( "_types/product/_added.html", cart=g.cart, - item=ci, + item=ci_ns, ) # normal POST: go to cart page diff --git a/market/scrape/auth.py b/market/scrape/auth.py new file mode 100644 index 0000000..d6e379a --- /dev/null +++ b/market/scrape/auth.py @@ -0,0 +1,148 @@ +"""OAuth device flow authentication for the market scraper. + +Same flow as the artdag CLI client: +1. Request device code from account server +2. User approves in browser +3. Poll for access token +4. Save token to ~/.artdag/token.json (shared with artdag CLI) +""" +from __future__ import annotations + +import json +import os +import sys +import time +from pathlib import Path + +import httpx + +TOKEN_DIR = Path.home() / ".artdag" +TOKEN_FILE = TOKEN_DIR / "token.json" + +_DEFAULT_ACCOUNT_SERVER = "https://account.rose-ash.com" + + +def _get_account_server() -> str: + return os.getenv("ARTDAG_ACCOUNT", _DEFAULT_ACCOUNT_SERVER) + + +def load_token() -> dict: + """Load saved token from ~/.artdag/token.json.""" + if TOKEN_FILE.exists(): + try: + with open(TOKEN_FILE) as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return {} + return {} + + +def save_token(token_data: dict): + """Save token to ~/.artdag/token.json (mode 0600).""" + TOKEN_DIR.mkdir(parents=True, exist_ok=True) + with open(TOKEN_FILE, "w") as f: + json.dump(token_data, f, indent=2) + TOKEN_FILE.chmod(0o600) + + +def get_access_token(require: bool = True) -> str | None: + """Return the saved access token, or None. + + When *require* is True and no token is found, triggers interactive + device-flow login (same as ``artdag login``). + """ + token = load_token().get("access_token") + if not token and require: + print("No saved token found — starting device-flow login...") + token_data = login() + token = token_data.get("access_token") + return token + + +def auth_headers() -> dict[str, str]: + """Return Authorization header dict for HTTP requests.""" + token = get_access_token(require=True) + return {"Authorization": f"Bearer {token}"} + + +def login() -> dict: + """Interactive device-flow login (blocking, prints to stdout). + + Returns the token data dict on success. Exits on failure. + """ + account = _get_account_server() + + # 1. Request device code + try: + with httpx.Client(timeout=10) as client: + resp = client.post( + f"{account}/auth/device/authorize", + json={"client_id": "artdag"}, + ) + resp.raise_for_status() + data = resp.json() + except httpx.HTTPError as e: + print(f"Login failed: {e}", file=sys.stderr) + sys.exit(1) + + device_code = data["device_code"] + user_code = data["user_code"] + verification_uri = data["verification_uri"] + expires_in = data.get("expires_in", 900) + interval = data.get("interval", 5) + + print("To sign in, open this URL in your browser:") + print(f" {verification_uri}") + print(f" and enter code: {user_code}") + print() + + # Try to open browser + try: + import webbrowser + webbrowser.open(verification_uri) + except Exception: + pass + + # 2. Poll for approval + print("Waiting for authorization", end="", flush=True) + deadline = time.time() + expires_in + + with httpx.Client(timeout=10) as client: + while time.time() < deadline: + time.sleep(interval) + print(".", end="", flush=True) + + try: + resp = client.post( + f"{account}/auth/device/token", + json={"device_code": device_code, "client_id": "artdag"}, + ) + data = resp.json() + except httpx.HTTPError: + continue + + error = data.get("error") + if error == "authorization_pending": + continue + elif error == "expired_token": + print("\nCode expired. Please try again.", file=sys.stderr) + sys.exit(1) + elif error == "access_denied": + print("\nAuthorization denied.", file=sys.stderr) + sys.exit(1) + elif error: + print(f"\nLogin failed: {error}", file=sys.stderr) + sys.exit(1) + + # Success + token_data = { + "access_token": data["access_token"], + "username": data.get("username", ""), + "display_name": data.get("display_name", ""), + } + save_token(token_data) + print(f"\nLogged in as {token_data['username'] or token_data['display_name']}") + return token_data + + print("\nTimed out waiting for authorization.", file=sys.stderr) + sys.exit(1) diff --git a/market/scrape/persist_api/capture_listing.py b/market/scrape/persist_api/capture_listing.py index 3943253..1104ec6 100644 --- a/market/scrape/persist_api/capture_listing.py +++ b/market/scrape/persist_api/capture_listing.py @@ -1,16 +1,16 @@ -# replace your existing upsert_product with this version - import os import httpx from typing import List +from ..auth import auth_headers + async def capture_listing( url: str, items: List[str], total_pages: int ): - + sync_url = os.getenv("CAPTURE_LISTING_URL", "http://localhost:8001/market/suma-market/api/products/listing/") async with httpx.AsyncClient(timeout=httpx.Timeout(20.0, connect=10.0)) as client: @@ -19,7 +19,7 @@ async def capture_listing( "items": items, "total_pages": total_pages } - resp = await client.post(sync_url, json=_d) + resp = await client.post(sync_url, json=_d, headers=auth_headers()) # Raise for non-2xx resp.raise_for_status() data = resp.json() if resp.content else {} diff --git a/market/scrape/persist_api/log_product_result.py b/market/scrape/persist_api/log_product_result.py index bf285ed..d729b3e 100644 --- a/market/scrape/persist_api/log_product_result.py +++ b/market/scrape/persist_api/log_product_result.py @@ -1,14 +1,14 @@ -# replace your existing upsert_product with this version - import os import httpx +from ..auth import auth_headers + async def log_product_result( ok: bool, payload ): - + sync_url = os.getenv("PRODUCT_LOG_URL", "http://localhost:8000/market/api/products/log/") async with httpx.AsyncClient(timeout=httpx.Timeout(20.0, connect=10.0)) as client: @@ -16,7 +16,7 @@ async def log_product_result( "ok": ok, "payload": payload } - resp = await client.post(sync_url, json=_d) + resp = await client.post(sync_url, json=_d, headers=auth_headers()) # Raise for non-2xx resp.raise_for_status() data = resp.json() if resp.content else {} diff --git a/market/scrape/persist_api/save_nav.py b/market/scrape/persist_api/save_nav.py index 3feeadb..a784d3d 100644 --- a/market/scrape/persist_api/save_nav.py +++ b/market/scrape/persist_api/save_nav.py @@ -1,17 +1,17 @@ -# replace your existing upsert_product with this version - import os import httpx from typing import Dict +from ..auth import auth_headers + async def save_nav( nav: Dict, ): sync_url = os.getenv("SAVE_NAV_URL", "http://localhost:8001/market/suma-market/api/products/nav/") async with httpx.AsyncClient(timeout=httpx.Timeout(20.0, connect=10.0)) as client: - resp = await client.post(sync_url, json=nav) + resp = await client.post(sync_url, json=nav, headers=auth_headers()) # Raise for non-2xx resp.raise_for_status() data = resp.json() if resp.content else {} diff --git a/market/scrape/persist_api/save_subcategory_redirects.py b/market/scrape/persist_api/save_subcategory_redirects.py index 60eba97..006b76d 100644 --- a/market/scrape/persist_api/save_subcategory_redirects.py +++ b/market/scrape/persist_api/save_subcategory_redirects.py @@ -3,11 +3,13 @@ import httpx from typing import Dict +from ..auth import auth_headers + async def save_subcategory_redirects(mapping: Dict[str, str]) -> None: sync_url = os.getenv("SAVE_REDIRECTS", "http://localhost:8000/market/api/products/redirects/") async with httpx.AsyncClient(timeout=httpx.Timeout(20.0, connect=10.0)) as client: - resp = await client.post(sync_url, json=mapping) + resp = await client.post(sync_url, json=mapping, headers=auth_headers()) # Raise for non-2xx resp.raise_for_status() data = resp.json() if resp.content else {} diff --git a/market/scrape/persist_api/upsert_product.py b/market/scrape/persist_api/upsert_product.py index d65149a..faed1dc 100644 --- a/market/scrape/persist_api/upsert_product.py +++ b/market/scrape/persist_api/upsert_product.py @@ -5,6 +5,8 @@ import httpx from typing import Dict, List, Any +from ..auth import auth_headers + async def upsert_product( slug, href, @@ -30,7 +32,7 @@ async def upsert_product( async def _do_call() -> Dict[str, Any]: async with httpx.AsyncClient(timeout=httpx.Timeout(20.0, connect=10.0)) as client: - resp = await client.post(sync_url, json=payload) + resp = await client.post(sync_url, json=payload, headers=auth_headers()) resp.raise_for_status() # tolerate empty body if not resp.content: diff --git a/shared/infrastructure/internal_inbox.py b/shared/infrastructure/internal_inbox.py new file mode 100644 index 0000000..7a5ff20 --- /dev/null +++ b/shared/infrastructure/internal_inbox.py @@ -0,0 +1,49 @@ +"""Internal AP inbox dispatch for synchronous inter-service writes. + +Each service can register handlers for (activity_type, object_type) pairs. +When an internal AP activity arrives, it is routed to the matching handler. + +This mirrors the federated ``dispatch_inbox_activity()`` pattern but for +HMAC-authenticated internal service-to-service calls. +""" +from __future__ import annotations + +import logging +from typing import Callable, Awaitable + +from sqlalchemy.ext.asyncio import AsyncSession + +log = logging.getLogger(__name__) + +# Registry: (activity_type, object_type) → async handler(session, body) → dict +_internal_handlers: dict[tuple[str, str], Callable[[AsyncSession, dict], Awaitable[dict]]] = {} + + +def register_internal_handler( + activity_type: str, + object_type: str, + handler: Callable[[AsyncSession, dict], Awaitable[dict]], +) -> None: + """Register a handler for an internal AP activity type + object type pair.""" + key = (activity_type, object_type) + if key in _internal_handlers: + log.warning("Overwriting internal handler for %s", key) + _internal_handlers[key] = handler + + +async def dispatch_internal_activity(session: AsyncSession, body: dict) -> dict: + """Route an internal AP activity to the correct handler. + + Returns the handler result dict. Raises ValueError for unknown types. + """ + activity_type = body.get("type", "") + obj = body.get("object") or {} + object_type = obj.get("type", "") if isinstance(obj, dict) else "" + + key = (activity_type, object_type) + handler = _internal_handlers.get(key) + if handler is None: + raise ValueError(f"No internal handler for {key}") + + log.info("Dispatching internal activity %s", key) + return await handler(session, body) diff --git a/shared/infrastructure/internal_inbox_client.py b/shared/infrastructure/internal_inbox_client.py new file mode 100644 index 0000000..8b36d4c --- /dev/null +++ b/shared/infrastructure/internal_inbox_client.py @@ -0,0 +1,84 @@ +"""Client for sending internal AP activities to other services. + +Replaces ``call_action`` for AP-shaped inter-service writes. +Uses the same HMAC authentication as actions/data clients. +""" +from __future__ import annotations + +import logging +import os + +import httpx + +from shared.infrastructure.internal_auth import sign_internal_headers + +log = logging.getLogger(__name__) + +_client: httpx.AsyncClient | None = None + +_DEFAULT_TIMEOUT = 5.0 + +INBOX_HEADER = "X-Internal-Inbox" + + +class InboxError(Exception): + """Raised when an internal inbox call fails.""" + + def __init__(self, message: str, status_code: int = 500, detail: dict | None = None): + super().__init__(message) + self.status_code = status_code + self.detail = detail + + +def _get_client() -> httpx.AsyncClient: + global _client + if _client is None or _client.is_closed: + _client = httpx.AsyncClient( + timeout=httpx.Timeout(_DEFAULT_TIMEOUT), + follow_redirects=False, + ) + return _client + + +def _internal_url(app_name: str) -> str: + env_key = f"INTERNAL_URL_{app_name.upper()}" + return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/") + + +async def send_internal_activity( + app_name: str, + activity: dict, + *, + timeout: float = _DEFAULT_TIMEOUT, +) -> dict: + """POST an AP activity to the target service's /internal/inbox. + + Returns the parsed JSON response on 2xx. + Raises ``InboxError`` on network errors or non-2xx responses. + """ + base = _internal_url(app_name) + url = f"{base}/internal/inbox" + try: + headers = {INBOX_HEADER: "1", **sign_internal_headers(app_name)} + resp = await _get_client().post( + url, + json=activity, + headers=headers, + timeout=timeout, + ) + if 200 <= resp.status_code < 300: + return resp.json() + msg = f"Inbox {app_name} returned {resp.status_code}" + detail = None + try: + detail = resp.json() + except Exception: + pass + log.error(msg) + raise InboxError(msg, status_code=resp.status_code, detail=detail) + except InboxError: + raise + except Exception as exc: + msg = f"Inbox {app_name} failed: {exc}" + log.error(msg) + raise InboxError(msg) from exc diff --git a/shared/models/market.py b/shared/models/market.py index 9c68c20..587632a 100644 --- a/shared/models/market.py +++ b/shared/models/market.py @@ -410,19 +410,18 @@ class CartItem(Base): nullable=True, ) - # Cross-domain relationships — explicit join, viewonly (no FK constraint) - market_place: Mapped["MarketPlace | None"] = relationship( - "MarketPlace", - primaryjoin="CartItem.market_place_id == MarketPlace.id", - foreign_keys="[CartItem.market_place_id]", - viewonly=True, - ) - product: Mapped["Product"] = relationship( - "Product", - primaryjoin="CartItem.product_id == Product.id", - foreign_keys="[CartItem.product_id]", - viewonly=True, - ) + # Denormalized product data (snapshotted at write time) + product_title: Mapped[str | None] = mapped_column(String(512), nullable=True) + product_slug: Mapped[str | None] = mapped_column(String(512), nullable=True) + product_image: Mapped[str | None] = mapped_column(Text, nullable=True) + product_brand: Mapped[str | None] = mapped_column(String(255), nullable=True) + product_regular_price: Mapped[float | None] = mapped_column(Numeric(12, 2), nullable=True) + product_special_price: Mapped[float | None] = mapped_column(Numeric(12, 2), nullable=True) + product_price_currency: Mapped[str | None] = mapped_column(String(16), nullable=True) + + # Denormalized marketplace data (snapshotted at write time) + market_place_name: Mapped[str | None] = mapped_column(String(255), nullable=True) + market_place_container_id: Mapped[int | None] = mapped_column(Integer, nullable=True) __table_args__ = ( Index("ix_cart_items_user_product", "user_id", "product_id"), diff --git a/shared/models/order.py b/shared/models/order.py index 1fda0f7..04814ad 100644 --- a/shared/models/order.py +++ b/shared/models/order.py @@ -87,6 +87,8 @@ class OrderItem(Base): nullable=False, ) product_title: Mapped[Optional[str]] = mapped_column(String(512), nullable=True) + product_slug: Mapped[Optional[str]] = mapped_column(String(512), nullable=True) + product_image: Mapped[Optional[str]] = mapped_column(Text, nullable=True) quantity: Mapped[int] = mapped_column(Integer, nullable=False, default=1) unit_price: Mapped[float] = mapped_column(Numeric(12, 2), nullable=False) @@ -102,12 +104,3 @@ class OrderItem(Base): "Order", back_populates="items", ) - - # Cross-domain relationship — explicit join, viewonly (no FK constraint) - product: Mapped["Product"] = relationship( - "Product", - primaryjoin="OrderItem.product_id == Product.id", - foreign_keys="[OrderItem.product_id]", - viewonly=True, - lazy="selectin", - )