Decouple cart/market DBs: denormalize product data, AP internal inbox, OAuth scraper auth

Remove cross-DB relationships (CartItem.product, CartItem.market_place,
OrderItem.product) that break with per-service databases. Denormalize
product and marketplace fields onto cart_items/order_items at write time.

- Add AP internal inbox infrastructure (shared/infrastructure/internal_inbox*)
  for synchronous inter-service writes via HMAC-authenticated POST
- Cart inbox blueprint handles Add/Remove/Update rose:CartItem activities
- Market app sends AP activities to cart inbox instead of writing CartItem directly
- Cart services use denormalized columns instead of cross-DB hydration/joins
- Add marketplaces-by-ids data endpoint to market service
- Alembic migration adds denormalized columns to cart_items and order_items
- Add OAuth device flow auth to market scraper persist_api (artdag client pattern)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-26 14:49:04 +00:00
parent cf7fbd8e9b
commit 81112c716b
28 changed files with 739 additions and 186 deletions

View File

@@ -2,6 +2,10 @@
Federated content-addressed DAG execution engine for distributed media processing with ActivityPub ownership and provenance tracking.
## Deployment
- **Do NOT push** until explicitly told to. Pushes reload code to dev automatically.
## Project Structure
```
@@ -54,6 +58,9 @@ docker build -f l2/Dockerfile -t l2-server:latest .
- **Celery Tasks**: In `l1/tasks/`, decorated with `@app.task`
- **S-Expression Effects**: Composable effect language in `l1/sexp_effects/`
- **Storage**: Local filesystem, S3, or IPFS backends (`storage_providers.py`)
- **Inter-Service Reads**: `fetch_data()` → GET `/internal/data/{query}` (HMAC-signed)
- **Inter-Service Actions**: `call_action()` → POST `/internal/actions/{name}` (HMAC-signed)
- **Inter-Service AP Inbox**: `send_internal_activity()` → POST `/internal/inbox` (HMAC-signed, AP-shaped activities for cross-service writes with denormalized data)
## Auth

View File

@@ -0,0 +1,48 @@
"""Denormalize product and marketplace data onto cart_items and order_items.
Revision ID: cart_0002
Revises: cart_0001
Create Date: 2026-02-26
"""
import sqlalchemy as sa
from alembic import op
revision = "cart_0002"
down_revision = "cart_0001"
branch_labels = None
depends_on = None
def upgrade():
# -- cart_items: denormalized product data --
op.add_column("cart_items", sa.Column("product_title", sa.String(512), nullable=True))
op.add_column("cart_items", sa.Column("product_slug", sa.String(512), nullable=True))
op.add_column("cart_items", sa.Column("product_image", sa.Text, nullable=True))
op.add_column("cart_items", sa.Column("product_brand", sa.String(255), nullable=True))
op.add_column("cart_items", sa.Column("product_regular_price", sa.Numeric(12, 2), nullable=True))
op.add_column("cart_items", sa.Column("product_special_price", sa.Numeric(12, 2), nullable=True))
op.add_column("cart_items", sa.Column("product_price_currency", sa.String(16), nullable=True))
# -- cart_items: denormalized marketplace data --
op.add_column("cart_items", sa.Column("market_place_name", sa.String(255), nullable=True))
op.add_column("cart_items", sa.Column("market_place_container_id", sa.Integer, nullable=True))
# -- order_items: denormalized product fields --
op.add_column("order_items", sa.Column("product_slug", sa.String(512), nullable=True))
op.add_column("order_items", sa.Column("product_image", sa.Text, nullable=True))
def downgrade():
op.drop_column("order_items", "product_image")
op.drop_column("order_items", "product_slug")
op.drop_column("cart_items", "market_place_container_id")
op.drop_column("cart_items", "market_place_name")
op.drop_column("cart_items", "product_price_currency")
op.drop_column("cart_items", "product_special_price")
op.drop_column("cart_items", "product_regular_price")
op.drop_column("cart_items", "product_brand")
op.drop_column("cart_items", "product_image")
op.drop_column("cart_items", "product_slug")
op.drop_column("cart_items", "product_title")

View File

@@ -19,6 +19,7 @@ from bp import (
register_fragments,
register_actions,
register_data,
register_inbox,
)
from bp.cart.services import (
get_cart,
@@ -143,6 +144,7 @@ def create_app() -> "Quart":
app.register_blueprint(register_fragments())
app.register_blueprint(register_actions())
app.register_blueprint(register_data())
app.register_blueprint(register_inbox())
# --- Page slug hydration (follows events/market app pattern) ---

View File

@@ -6,3 +6,4 @@ from .orders.routes import register as register_orders
from .fragments import register_fragments
from .actions import register_actions
from .data import register_data
from .inbox import register_inbox

View File

@@ -7,7 +7,6 @@ from sqlalchemy import select
from shared.models.market import CartItem
from shared.models.order import Order
from shared.models.market_place import MarketPlace
from shared.infrastructure.actions import call_action
from .services import (
current_cart_identity,
@@ -265,16 +264,14 @@ def register(url_prefix: str) -> Blueprint:
required=False) if raw_pc else None
if post:
g.page_slug = post["slug"]
result = await g.s.execute(
select(MarketPlace).where(
MarketPlace.container_type == "page",
MarketPlace.container_id == post["id"],
MarketPlace.deleted_at.is_(None),
).limit(1)
)
mp = result.scalar_one_or_none()
if mp:
g.market_slug = mp.slug
# Fetch marketplace slug from market service
mps = await fetch_data(
"market", "marketplaces-for-container",
params={"type": "page", "id": post["id"]},
required=False,
) or []
if mps:
g.market_slug = mps[0].get("slug")
if order.sumup_checkout_id:
try:

View File

@@ -9,9 +9,8 @@ from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from shared.models.market import Product, CartItem
from shared.models.market import CartItem
from shared.models.order import Order, OrderItem
from shared.models.market_place import MarketPlace
from shared.config import config
from shared.contracts.dtos import CalendarEntryDTO
from shared.events import emit_activity
@@ -24,17 +23,24 @@ async def find_or_create_cart_item(
product_id: int,
user_id: Optional[int],
session_id: Optional[str],
*,
product_title: str | None = None,
product_slug: str | None = None,
product_image: str | None = None,
product_brand: str | None = None,
product_regular_price: float | None = None,
product_special_price: float | None = None,
product_price_currency: str | None = None,
market_place_id: int | None = None,
market_place_name: str | None = None,
market_place_container_id: int | None = None,
) -> Optional[CartItem]:
"""
Find an existing cart item for this product/identity, or create a new one.
Returns None if the product doesn't exist.
Returns None if product data is missing.
Increments quantity if item already exists.
"""
# Make sure product exists
product = await session.scalar(
select(Product).where(Product.id == product_id)
)
if not product:
if not product_id:
return None
# Look for existing cart item
@@ -56,8 +62,18 @@ async def find_or_create_cart_item(
cart_item = CartItem(
user_id=user_id,
session_id=session_id,
product_id=product.id,
product_id=product_id,
quantity=1,
market_place_id=market_place_id,
product_title=product_title,
product_slug=product_slug,
product_image=product_image,
product_brand=product_brand,
product_regular_price=product_regular_price,
product_special_price=product_special_price,
product_price_currency=product_price_currency,
market_place_name=market_place_name,
market_place_container_id=market_place_container_id,
)
session.add(cart_item)
return cart_item
@@ -76,12 +92,10 @@ async def resolve_page_config(
"""
post_ids: set[int] = set()
# From cart items via market_place
# From cart items via denormalized market_place_container_id
for ci in cart:
if ci.market_place_id:
mp = await session.get(MarketPlace, ci.market_place_id)
if mp:
post_ids.add(mp.container_id)
if ci.market_place_container_id:
post_ids.add(ci.market_place_container_id)
# From calendar entries via calendar
for entry in calendar_entries:
@@ -130,8 +144,7 @@ async def create_order_from_cart(
cart_total = product_total + calendar_total + ticket_total
# Determine currency from first product
first_product = cart[0].product if cart else None
currency = (first_product.regular_price_currency if first_product else None) or "GBP"
currency = (cart[0].product_price_currency if cart else None) or "GBP"
# Create order
order = Order(
@@ -146,11 +159,13 @@ async def create_order_from_cart(
# Create order items from cart
for ci in cart:
price = ci.product.special_price or ci.product.regular_price or 0
price = ci.product_special_price or ci.product_regular_price or 0
oi = OrderItem(
order=order,
product_id=ci.product.id,
product_title=ci.product.title,
product_id=ci.product_id,
product_title=ci.product_title,
product_slug=ci.product_slug,
product_image=ci.product_image,
quantity=ci.quantity,
unit_price=price,
currency=currency,
@@ -188,7 +203,7 @@ async def create_order_from_cart(
def build_sumup_description(cart: list[CartItem], order_id: int, *, ticket_count: int = 0) -> str:
"""Build a human-readable description for SumUp checkout."""
titles = [ci.product.title for ci in cart if ci.product and ci.product.title]
titles = [ci.product_title for ci in cart if ci.product_title]
item_count = sum(ci.quantity for ci in cart)
parts = []
@@ -240,11 +255,11 @@ def validate_webhook_secret(token: Optional[str]) -> bool:
async def get_order_with_details(session: AsyncSession, order_id: int) -> Optional[Order]:
"""Fetch an order with items and calendar entries eagerly loaded."""
"""Fetch an order with items eagerly loaded."""
result = await session.execute(
select(Order)
.options(
selectinload(Order.items).selectinload(OrderItem.product),
selectinload(Order.items),
)
.where(Order.id == order_id)
)

View File

@@ -1,7 +1,6 @@
from sqlalchemy import update, func, select
from sqlalchemy import update, func
from shared.models.market import CartItem
from shared.models.market_place import MarketPlace
from shared.models.order import Order
@@ -23,12 +22,7 @@ async def clear_cart_for_order(session, order: Order, *, page_post_id: int | Non
return
if page_post_id is not None:
mp_ids = select(MarketPlace.id).where(
MarketPlace.container_type == "page",
MarketPlace.container_id == page_post_id,
MarketPlace.deleted_at.is_(None),
).scalar_subquery()
filters.append(CartItem.market_place_id.in_(mp_ids))
filters.append(CartItem.market_place_container_id == page_post_id)
await session.execute(
update(CartItem)

View File

@@ -1,9 +1,37 @@
from types import SimpleNamespace
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from shared.models.market import CartItem
from .identity import current_cart_identity
def _attach_product_namespace(ci: CartItem) -> None:
"""Build a SimpleNamespace 'product' from denormalized columns for template compat."""
ci.product = SimpleNamespace(
id=ci.product_id,
title=ci.product_title,
slug=ci.product_slug,
image=ci.product_image,
brand=ci.product_brand,
regular_price=ci.product_regular_price,
special_price=ci.product_special_price,
regular_price_currency=ci.product_price_currency,
)
def _attach_market_place_namespace(ci: CartItem) -> None:
"""Build a SimpleNamespace 'market_place' from denormalized columns."""
if ci.market_place_id:
ci.market_place = SimpleNamespace(
id=ci.market_place_id,
name=ci.market_place_name,
container_id=ci.market_place_container_id,
)
else:
ci.market_place = None
async def get_cart(session):
ident = current_cart_identity()
@@ -17,9 +45,11 @@ async def get_cart(session):
select(CartItem)
.where(*filters)
.order_by(CartItem.created_at.desc())
.options(
selectinload(CartItem.product),
selectinload(CartItem.market_place),
)
)
return result.scalars().all()
items = list(result.scalars().all())
for ci in items:
_attach_product_namespace(ci)
_attach_market_place_namespace(ci)
return items

View File

@@ -2,7 +2,7 @@
Page-scoped cart queries.
Groups cart items and calendar entries by their owning page (Post),
determined via CartItem.market_place.container_id and CalendarEntry.calendar.container_id
determined via CartItem.market_place_container_id
(where container_type == "page").
"""
from __future__ import annotations
@@ -12,24 +12,21 @@ from collections import defaultdict
from types import SimpleNamespace
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from shared.models.market import CartItem
from shared.models.market_place import MarketPlace
from shared.infrastructure.data_client import fetch_data
from shared.contracts.dtos import CalendarEntryDTO, TicketDTO, PostDTO, dto_from_dict
from .identity import current_cart_identity
from .get_cart import _attach_product_namespace, _attach_market_place_namespace
async def get_cart_for_page(session, post_id: int) -> list[CartItem]:
"""Return cart items scoped to a specific page (via MarketPlace.container_id)."""
"""Return cart items scoped to a specific page (via denormalized market_place_container_id)."""
ident = current_cart_identity()
filters = [
CartItem.deleted_at.is_(None),
MarketPlace.container_type == "page",
MarketPlace.container_id == post_id,
MarketPlace.deleted_at.is_(None),
CartItem.market_place_container_id == post_id,
]
if ident["user_id"] is not None:
filters.append(CartItem.user_id == ident["user_id"])
@@ -38,15 +35,16 @@ async def get_cart_for_page(session, post_id: int) -> list[CartItem]:
result = await session.execute(
select(CartItem)
.join(MarketPlace, CartItem.market_place_id == MarketPlace.id)
.where(*filters)
.order_by(CartItem.created_at.desc())
.options(
selectinload(CartItem.product),
selectinload(CartItem.market_place),
)
)
return result.scalars().all()
items = list(result.scalars().all())
for ci in items:
_attach_product_namespace(ci)
_attach_market_place_namespace(ci)
return items
async def get_calendar_entries_for_page(session, post_id: int):

View File

@@ -4,10 +4,9 @@ from decimal import Decimal
def total(cart):
return sum(
(
Decimal(str(item.product.special_price or item.product.regular_price))
Decimal(str(item.product_special_price or item.product_regular_price))
* item.quantity
)
for item in cart
if (item.product.special_price or item.product.regular_price) is not None
if (item.product_special_price or item.product_regular_price) is not None
)

View File

@@ -83,7 +83,6 @@ def register() -> Blueprint:
# --- cart-items (product slugs + quantities for template rendering) ---
async def _cart_items():
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from shared.models.market import CartItem
user_id = request.args.get("user_id", type=int)
@@ -98,13 +97,13 @@ def register() -> Blueprint:
return []
result = await g.s.execute(
select(CartItem).where(*filters).options(selectinload(CartItem.product))
select(CartItem).where(*filters)
)
items = result.scalars().all()
return [
{
"product_id": item.product_id,
"product_slug": item.product.slug if item.product else None,
"product_slug": item.product_slug,
"quantity": item.quantity,
}
for item in items

View File

@@ -0,0 +1 @@
from .routes import register as register_inbox

161
cart/bp/inbox/routes.py Normal file
View File

@@ -0,0 +1,161 @@
"""Cart internal inbox endpoint.
Receives AP-shaped activities from other services via HMAC-authenticated
POST to ``/internal/inbox``. Routes to handlers registered via the
internal inbox dispatch infrastructure.
"""
from __future__ import annotations
import logging
from datetime import datetime, timezone
from quart import Blueprint, g, jsonify, request
from sqlalchemy import select
from shared.infrastructure.internal_inbox import dispatch_internal_activity, register_internal_handler
from shared.infrastructure.internal_inbox_client import INBOX_HEADER
from shared.models.market import CartItem
log = logging.getLogger(__name__)
def register() -> Blueprint:
bp = Blueprint("inbox", __name__, url_prefix="/internal/inbox")
@bp.before_request
async def _require_inbox_header():
if not request.headers.get(INBOX_HEADER):
return jsonify({"error": "forbidden"}), 403
from shared.infrastructure.internal_auth import validate_internal_request
if not validate_internal_request():
return jsonify({"error": "forbidden"}), 403
@bp.post("")
async def handle_inbox():
body = await request.get_json()
if not body:
return jsonify({"error": "empty body"}), 400
try:
result = await dispatch_internal_activity(g.s, body)
return jsonify(result)
except ValueError as exc:
return jsonify({"error": str(exc)}), 400
except Exception as exc:
log.exception("Internal inbox dispatch failed")
return jsonify({"error": str(exc)}), 500
# --- Handler: Add rose:CartItem ---
async def _handle_add_cart_item(session, body: dict) -> dict:
obj = body["object"]
user_id = obj.get("user_id")
session_id = obj.get("session_id")
product_id = obj["product_id"]
count = obj.get("quantity", 1)
market_place_id = obj.get("market_place_id")
# Look for existing cart item
filters = [
CartItem.deleted_at.is_(None),
CartItem.product_id == product_id,
]
if user_id is not None:
filters.append(CartItem.user_id == user_id)
else:
filters.append(CartItem.session_id == session_id)
existing = await session.scalar(select(CartItem).where(*filters))
if existing:
if count > 0:
existing.quantity = count
else:
existing.deleted_at = datetime.now(timezone.utc)
ci = existing
else:
if count <= 0:
return {"ok": True, "action": "noop"}
ci = CartItem(
user_id=user_id,
session_id=session_id,
product_id=product_id,
quantity=count,
market_place_id=market_place_id,
# Denormalized product data
product_title=obj.get("product_title"),
product_slug=obj.get("product_slug"),
product_image=obj.get("product_image"),
product_brand=obj.get("product_brand"),
product_regular_price=obj.get("product_regular_price"),
product_special_price=obj.get("product_special_price"),
product_price_currency=obj.get("product_price_currency"),
# Denormalized marketplace data
market_place_name=obj.get("market_place_name"),
market_place_container_id=obj.get("market_place_container_id"),
)
session.add(ci)
await session.flush()
return {
"ok": True,
"cart_item_id": ci.id,
"quantity": ci.quantity,
}
register_internal_handler("Add", "rose:CartItem", _handle_add_cart_item)
# --- Handler: Remove rose:CartItem ---
async def _handle_remove_cart_item(session, body: dict) -> dict:
obj = body["object"]
user_id = obj.get("user_id")
session_id = obj.get("session_id")
product_id = obj["product_id"]
filters = [
CartItem.deleted_at.is_(None),
CartItem.product_id == product_id,
]
if user_id is not None:
filters.append(CartItem.user_id == user_id)
else:
filters.append(CartItem.session_id == session_id)
existing = await session.scalar(select(CartItem).where(*filters))
if existing:
existing.deleted_at = datetime.now(timezone.utc)
await session.flush()
return {"ok": True}
register_internal_handler("Remove", "rose:CartItem", _handle_remove_cart_item)
# --- Handler: Update rose:CartItem ---
async def _handle_update_cart_item(session, body: dict) -> dict:
obj = body["object"]
user_id = obj.get("user_id")
session_id = obj.get("session_id")
product_id = obj["product_id"]
quantity = obj.get("quantity", 1)
filters = [
CartItem.deleted_at.is_(None),
CartItem.product_id == product_id,
]
if user_id is not None:
filters.append(CartItem.user_id == user_id)
else:
filters.append(CartItem.session_id == session_id)
existing = await session.scalar(select(CartItem).where(*filters))
if existing:
if quantity <= 0:
existing.deleted_at = datetime.now(timezone.utc)
else:
existing.quantity = quantity
await session.flush()
return {"ok": True, "quantity": existing.quantity}
return {"ok": True, "action": "noop"}
register_internal_handler("Update", "rose:CartItem", _handle_update_cart_item)
return bp

View File

@@ -5,7 +5,6 @@ from sqlalchemy import select, func, or_, cast, String, exists
from sqlalchemy.orm import selectinload
from shared.models.market import Product
from shared.models.order import Order, OrderItem
from shared.browser.app.payments.sumup import create_checkout as sumup_create_checkout
from shared.config import config
@@ -49,7 +48,7 @@ def register() -> Blueprint:
result = await g.s.execute(
select(Order)
.options(
selectinload(Order.items).selectinload(OrderItem.product)
selectinload(Order.items)
)
.where(Order.id == order_id, owner)
)

View File

@@ -5,7 +5,6 @@ from sqlalchemy import select, func, or_, cast, String, exists
from sqlalchemy.orm import selectinload
from shared.models.market import Product
from shared.models.order import Order, OrderItem
from shared.browser.app.payments.sumup import create_checkout as sumup_create_checkout
from shared.config import config
@@ -86,16 +85,11 @@ def register(url_prefix: str) -> Blueprint:
exists(
select(1)
.select_from(OrderItem)
.join(Product, Product.id == OrderItem.product_id)
.where(
OrderItem.order_id == Order.id,
or_(
OrderItem.product_title.ilike(term),
Product.title.ilike(term),
Product.description_short.ilike(term),
Product.description_html.ilike(term),
Product.slug.ilike(term),
Product.brand.ilike(term),
OrderItem.product_slug.ilike(term),
),
)
)

View File

@@ -7,13 +7,13 @@
<ul class="divide-y divide-stone-100 text-xs sm:text-sm">
{% for item in order.items %}
<li>
<a class="w-full py-2 flex gap-3" href="{{ market_product_url(item.product.slug) }}">
<a class="w-full py-2 flex gap-3" href="{{ market_product_url(item.product_slug) }}">
{# Thumbnail #}
<div class="w-12 h-12 sm:w-14 sm:h-14 rounded-md bg-stone-100 flex-shrink-0 overflow-hidden">
{% if item.product and item.product.image %}
{% if item.product_image %}
<img
src="{{ item.product.image }}"
alt="{{ item.product_title or item.product.title or 'Product image' }}"
src="{{ item.product_image }}"
alt="{{ item.product_title or 'Product image' }}"
class="w-full h-full object-contain object-center"
loading="lazy"
decoding="async"
@@ -29,7 +29,7 @@
<div class="flex-1 flex justify-between gap-3">
<div>
<p class="font-medium">
{{ item.product_title or (item.product and item.product.title) or 'Unknown product' }}
{{ item.product_title or 'Unknown product' }}
</p>
<p class="text-[11px] text-stone-500">
Product ID: {{ item.product_id }}

View File

@@ -76,4 +76,35 @@ def register() -> Blueprint:
_handlers["products-by-ids"] = _products_by_ids
# --- marketplaces-by-ids ---
async def _marketplaces_by_ids():
"""Return marketplace data for a list of IDs (comma-separated)."""
from sqlalchemy import select
from shared.models.market_place import MarketPlace
ids_raw = request.args.get("ids", "")
try:
ids = [int(x) for x in ids_raw.split(",") if x.strip()]
except ValueError:
return {"error": "ids must be comma-separated integers"}, 400
if not ids:
return []
rows = (await g.s.execute(
select(MarketPlace).where(MarketPlace.id.in_(ids))
)).scalars().all()
return [
{
"id": m.id,
"name": m.name,
"slug": m.slug,
"container_type": m.container_type,
"container_id": m.container_id,
}
for m in rows
]
_handlers["marketplaces-by-ids"] = _marketplaces_by_ids
return bp

View File

@@ -162,29 +162,25 @@ def register():
from bp.cart.services.identity import current_cart_identity
#from bp.cart.routes import view_cart
from models.market import CartItem
from quart import request, url_for
from shared.infrastructure.internal_inbox_client import send_internal_activity
from shared.infrastructure.data_client import fetch_data
@bp.post("/cart/")
@clear_cache(tag="browse", tag_scope="user")
async def cart():
slug = g.product_slug
# make sure product exists (we *allow* deleted_at != None later if you want)
product_id = await g.s.scalar(
select(Product.id).where(
# Load product from local db_market
product = await g.s.scalar(
select(Product).where(
Product.slug == slug,
Product.deleted_at.is_(None),
)
)
product = await g.s.scalar(
select(Product).where(Product.id == product_id)
)
if not product:
return await make_response("Product not found", 404)
# --- NEW: read `count` from body (JSON or form), default to 1 ---
# Read `count` from body (JSON or form), default to 1
count = 1
try:
if request.is_json:
@@ -196,68 +192,72 @@ def register():
if "count" in form:
count = int(form["count"])
except (ValueError, TypeError):
# if parsing fails, just fall back to 1
count = 1
# --- END NEW ---
ident = current_cart_identity()
market = getattr(g, "market", None)
# Load cart items for current user/session
from sqlalchemy.orm import selectinload
cart_filters = [CartItem.deleted_at.is_(None)]
if ident["user_id"] is not None:
cart_filters.append(CartItem.user_id == ident["user_id"])
else:
cart_filters.append(CartItem.session_id == ident["session_id"])
cart_result = await g.s.execute(
select(CartItem)
.where(*cart_filters)
.order_by(CartItem.created_at.desc())
.options(
selectinload(CartItem.product),
selectinload(CartItem.market_place),
)
)
g.cart = list(cart_result.scalars().all())
# Build AP activity with denormalized product data
activity_type = "Add" if count > 0 else "Remove"
activity = {
"type": activity_type,
"object": {
"type": "rose:CartItem",
"user_id": ident["user_id"],
"session_id": ident["session_id"],
"product_id": product.id,
"quantity": count,
"market_place_id": market.id if market else None,
# Denormalized product data
"product_title": product.title,
"product_slug": product.slug,
"product_image": product.image,
"product_brand": product.brand,
"product_regular_price": str(product.regular_price) if product.regular_price is not None else None,
"product_special_price": str(product.special_price) if product.special_price is not None else None,
"product_price_currency": product.regular_price_currency,
# Denormalized marketplace data
"market_place_name": market.name if market else None,
"market_place_container_id": market.container_id if market else None,
},
}
ci = next(
(item for item in g.cart if item.product_id == product_id),
await send_internal_activity("cart", activity)
# Fetch updated cart items from cart service for template rendering
raw_cart = await fetch_data(
"cart", "cart-items",
params={
k: v for k, v in {
"user_id": ident["user_id"],
"session_id": ident["session_id"],
}.items() if v is not None
},
required=False,
) or []
# Build minimal cart list for template (product slug + quantity)
from types import SimpleNamespace
g.cart = [
SimpleNamespace(
product_id=ci["product_id"],
product=SimpleNamespace(slug=ci["product_slug"]),
quantity=ci["quantity"],
)
for ci in raw_cart
]
ci_ns = next(
(item for item in g.cart if item.product_id == product.id),
None,
)
# --- NEW: set quantity based on `count` ---
if ci:
if count > 0:
ci.quantity = count
else:
# count <= 0 → remove from cart entirely
ci.quantity=0
g.cart.remove(ci)
await g.s.delete(ci)
else:
if count > 0:
ci = CartItem(
user_id=ident["user_id"],
session_id=ident["session_id"],
product_id=product.id,
product=product,
quantity=count,
market_place_id=getattr(g, "market", None) and g.market.id,
)
g.cart.append(ci)
g.s.add(ci)
# if count <= 0 and no existing item, do nothing
# --- END NEW ---
# no explicit commit; your session middleware should handle it
# htmx response: OOB-swap mini cart + product buttons
if request.headers.get("HX-Request") == "true":
return await render_template(
"_types/product/_added.html",
cart=g.cart,
item=ci,
item=ci_ns,
)
# normal POST: go to cart page

148
market/scrape/auth.py Normal file
View File

@@ -0,0 +1,148 @@
"""OAuth device flow authentication for the market scraper.
Same flow as the artdag CLI client:
1. Request device code from account server
2. User approves in browser
3. Poll for access token
4. Save token to ~/.artdag/token.json (shared with artdag CLI)
"""
from __future__ import annotations
import json
import os
import sys
import time
from pathlib import Path
import httpx
TOKEN_DIR = Path.home() / ".artdag"
TOKEN_FILE = TOKEN_DIR / "token.json"
_DEFAULT_ACCOUNT_SERVER = "https://account.rose-ash.com"
def _get_account_server() -> str:
return os.getenv("ARTDAG_ACCOUNT", _DEFAULT_ACCOUNT_SERVER)
def load_token() -> dict:
"""Load saved token from ~/.artdag/token.json."""
if TOKEN_FILE.exists():
try:
with open(TOKEN_FILE) as f:
return json.load(f)
except (json.JSONDecodeError, IOError):
return {}
return {}
def save_token(token_data: dict):
"""Save token to ~/.artdag/token.json (mode 0600)."""
TOKEN_DIR.mkdir(parents=True, exist_ok=True)
with open(TOKEN_FILE, "w") as f:
json.dump(token_data, f, indent=2)
TOKEN_FILE.chmod(0o600)
def get_access_token(require: bool = True) -> str | None:
"""Return the saved access token, or None.
When *require* is True and no token is found, triggers interactive
device-flow login (same as ``artdag login``).
"""
token = load_token().get("access_token")
if not token and require:
print("No saved token found — starting device-flow login...")
token_data = login()
token = token_data.get("access_token")
return token
def auth_headers() -> dict[str, str]:
"""Return Authorization header dict for HTTP requests."""
token = get_access_token(require=True)
return {"Authorization": f"Bearer {token}"}
def login() -> dict:
"""Interactive device-flow login (blocking, prints to stdout).
Returns the token data dict on success. Exits on failure.
"""
account = _get_account_server()
# 1. Request device code
try:
with httpx.Client(timeout=10) as client:
resp = client.post(
f"{account}/auth/device/authorize",
json={"client_id": "artdag"},
)
resp.raise_for_status()
data = resp.json()
except httpx.HTTPError as e:
print(f"Login failed: {e}", file=sys.stderr)
sys.exit(1)
device_code = data["device_code"]
user_code = data["user_code"]
verification_uri = data["verification_uri"]
expires_in = data.get("expires_in", 900)
interval = data.get("interval", 5)
print("To sign in, open this URL in your browser:")
print(f" {verification_uri}")
print(f" and enter code: {user_code}")
print()
# Try to open browser
try:
import webbrowser
webbrowser.open(verification_uri)
except Exception:
pass
# 2. Poll for approval
print("Waiting for authorization", end="", flush=True)
deadline = time.time() + expires_in
with httpx.Client(timeout=10) as client:
while time.time() < deadline:
time.sleep(interval)
print(".", end="", flush=True)
try:
resp = client.post(
f"{account}/auth/device/token",
json={"device_code": device_code, "client_id": "artdag"},
)
data = resp.json()
except httpx.HTTPError:
continue
error = data.get("error")
if error == "authorization_pending":
continue
elif error == "expired_token":
print("\nCode expired. Please try again.", file=sys.stderr)
sys.exit(1)
elif error == "access_denied":
print("\nAuthorization denied.", file=sys.stderr)
sys.exit(1)
elif error:
print(f"\nLogin failed: {error}", file=sys.stderr)
sys.exit(1)
# Success
token_data = {
"access_token": data["access_token"],
"username": data.get("username", ""),
"display_name": data.get("display_name", ""),
}
save_token(token_data)
print(f"\nLogged in as {token_data['username'] or token_data['display_name']}")
return token_data
print("\nTimed out waiting for authorization.", file=sys.stderr)
sys.exit(1)

View File

@@ -1,10 +1,10 @@
# replace your existing upsert_product with this version
import os
import httpx
from typing import List
from ..auth import auth_headers
async def capture_listing(
url: str,
items: List[str],
@@ -19,7 +19,7 @@ async def capture_listing(
"items": items,
"total_pages": total_pages
}
resp = await client.post(sync_url, json=_d)
resp = await client.post(sync_url, json=_d, headers=auth_headers())
# Raise for non-2xx
resp.raise_for_status()
data = resp.json() if resp.content else {}

View File

@@ -1,8 +1,8 @@
# replace your existing upsert_product with this version
import os
import httpx
from ..auth import auth_headers
async def log_product_result(
ok: bool,
@@ -16,7 +16,7 @@ async def log_product_result(
"ok": ok,
"payload": payload
}
resp = await client.post(sync_url, json=_d)
resp = await client.post(sync_url, json=_d, headers=auth_headers())
# Raise for non-2xx
resp.raise_for_status()
data = resp.json() if resp.content else {}

View File

@@ -1,17 +1,17 @@
# replace your existing upsert_product with this version
import os
import httpx
from typing import Dict
from ..auth import auth_headers
async def save_nav(
nav: Dict,
):
sync_url = os.getenv("SAVE_NAV_URL", "http://localhost:8001/market/suma-market/api/products/nav/")
async with httpx.AsyncClient(timeout=httpx.Timeout(20.0, connect=10.0)) as client:
resp = await client.post(sync_url, json=nav)
resp = await client.post(sync_url, json=nav, headers=auth_headers())
# Raise for non-2xx
resp.raise_for_status()
data = resp.json() if resp.content else {}

View File

@@ -3,11 +3,13 @@ import httpx
from typing import Dict
from ..auth import auth_headers
async def save_subcategory_redirects(mapping: Dict[str, str]) -> None:
sync_url = os.getenv("SAVE_REDIRECTS", "http://localhost:8000/market/api/products/redirects/")
async with httpx.AsyncClient(timeout=httpx.Timeout(20.0, connect=10.0)) as client:
resp = await client.post(sync_url, json=mapping)
resp = await client.post(sync_url, json=mapping, headers=auth_headers())
# Raise for non-2xx
resp.raise_for_status()
data = resp.json() if resp.content else {}

View File

@@ -5,6 +5,8 @@ import httpx
from typing import Dict, List, Any
from ..auth import auth_headers
async def upsert_product(
slug,
href,
@@ -30,7 +32,7 @@ async def upsert_product(
async def _do_call() -> Dict[str, Any]:
async with httpx.AsyncClient(timeout=httpx.Timeout(20.0, connect=10.0)) as client:
resp = await client.post(sync_url, json=payload)
resp = await client.post(sync_url, json=payload, headers=auth_headers())
resp.raise_for_status()
# tolerate empty body
if not resp.content:

View File

@@ -0,0 +1,49 @@
"""Internal AP inbox dispatch for synchronous inter-service writes.
Each service can register handlers for (activity_type, object_type) pairs.
When an internal AP activity arrives, it is routed to the matching handler.
This mirrors the federated ``dispatch_inbox_activity()`` pattern but for
HMAC-authenticated internal service-to-service calls.
"""
from __future__ import annotations
import logging
from typing import Callable, Awaitable
from sqlalchemy.ext.asyncio import AsyncSession
log = logging.getLogger(__name__)
# Registry: (activity_type, object_type) → async handler(session, body) → dict
_internal_handlers: dict[tuple[str, str], Callable[[AsyncSession, dict], Awaitable[dict]]] = {}
def register_internal_handler(
activity_type: str,
object_type: str,
handler: Callable[[AsyncSession, dict], Awaitable[dict]],
) -> None:
"""Register a handler for an internal AP activity type + object type pair."""
key = (activity_type, object_type)
if key in _internal_handlers:
log.warning("Overwriting internal handler for %s", key)
_internal_handlers[key] = handler
async def dispatch_internal_activity(session: AsyncSession, body: dict) -> dict:
"""Route an internal AP activity to the correct handler.
Returns the handler result dict. Raises ValueError for unknown types.
"""
activity_type = body.get("type", "")
obj = body.get("object") or {}
object_type = obj.get("type", "") if isinstance(obj, dict) else ""
key = (activity_type, object_type)
handler = _internal_handlers.get(key)
if handler is None:
raise ValueError(f"No internal handler for {key}")
log.info("Dispatching internal activity %s", key)
return await handler(session, body)

View File

@@ -0,0 +1,84 @@
"""Client for sending internal AP activities to other services.
Replaces ``call_action`` for AP-shaped inter-service writes.
Uses the same HMAC authentication as actions/data clients.
"""
from __future__ import annotations
import logging
import os
import httpx
from shared.infrastructure.internal_auth import sign_internal_headers
log = logging.getLogger(__name__)
_client: httpx.AsyncClient | None = None
_DEFAULT_TIMEOUT = 5.0
INBOX_HEADER = "X-Internal-Inbox"
class InboxError(Exception):
"""Raised when an internal inbox call fails."""
def __init__(self, message: str, status_code: int = 500, detail: dict | None = None):
super().__init__(message)
self.status_code = status_code
self.detail = detail
def _get_client() -> httpx.AsyncClient:
global _client
if _client is None or _client.is_closed:
_client = httpx.AsyncClient(
timeout=httpx.Timeout(_DEFAULT_TIMEOUT),
follow_redirects=False,
)
return _client
def _internal_url(app_name: str) -> str:
env_key = f"INTERNAL_URL_{app_name.upper()}"
return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/")
async def send_internal_activity(
app_name: str,
activity: dict,
*,
timeout: float = _DEFAULT_TIMEOUT,
) -> dict:
"""POST an AP activity to the target service's /internal/inbox.
Returns the parsed JSON response on 2xx.
Raises ``InboxError`` on network errors or non-2xx responses.
"""
base = _internal_url(app_name)
url = f"{base}/internal/inbox"
try:
headers = {INBOX_HEADER: "1", **sign_internal_headers(app_name)}
resp = await _get_client().post(
url,
json=activity,
headers=headers,
timeout=timeout,
)
if 200 <= resp.status_code < 300:
return resp.json()
msg = f"Inbox {app_name} returned {resp.status_code}"
detail = None
try:
detail = resp.json()
except Exception:
pass
log.error(msg)
raise InboxError(msg, status_code=resp.status_code, detail=detail)
except InboxError:
raise
except Exception as exc:
msg = f"Inbox {app_name} failed: {exc}"
log.error(msg)
raise InboxError(msg) from exc

View File

@@ -410,19 +410,18 @@ class CartItem(Base):
nullable=True,
)
# Cross-domain relationships — explicit join, viewonly (no FK constraint)
market_place: Mapped["MarketPlace | None"] = relationship(
"MarketPlace",
primaryjoin="CartItem.market_place_id == MarketPlace.id",
foreign_keys="[CartItem.market_place_id]",
viewonly=True,
)
product: Mapped["Product"] = relationship(
"Product",
primaryjoin="CartItem.product_id == Product.id",
foreign_keys="[CartItem.product_id]",
viewonly=True,
)
# Denormalized product data (snapshotted at write time)
product_title: Mapped[str | None] = mapped_column(String(512), nullable=True)
product_slug: Mapped[str | None] = mapped_column(String(512), nullable=True)
product_image: Mapped[str | None] = mapped_column(Text, nullable=True)
product_brand: Mapped[str | None] = mapped_column(String(255), nullable=True)
product_regular_price: Mapped[float | None] = mapped_column(Numeric(12, 2), nullable=True)
product_special_price: Mapped[float | None] = mapped_column(Numeric(12, 2), nullable=True)
product_price_currency: Mapped[str | None] = mapped_column(String(16), nullable=True)
# Denormalized marketplace data (snapshotted at write time)
market_place_name: Mapped[str | None] = mapped_column(String(255), nullable=True)
market_place_container_id: Mapped[int | None] = mapped_column(Integer, nullable=True)
__table_args__ = (
Index("ix_cart_items_user_product", "user_id", "product_id"),

View File

@@ -87,6 +87,8 @@ class OrderItem(Base):
nullable=False,
)
product_title: Mapped[Optional[str]] = mapped_column(String(512), nullable=True)
product_slug: Mapped[Optional[str]] = mapped_column(String(512), nullable=True)
product_image: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
quantity: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
unit_price: Mapped[float] = mapped_column(Numeric(12, 2), nullable=False)
@@ -102,12 +104,3 @@ class OrderItem(Base):
"Order",
back_populates="items",
)
# Cross-domain relationship — explicit join, viewonly (no FK constraint)
product: Mapped["Product"] = relationship(
"Product",
primaryjoin="OrderItem.product_id == Product.id",
foreign_keys="[OrderItem.product_id]",
viewonly=True,
lazy="selectin",
)