Remove cross-DB relationships (CartItem.product, CartItem.market_place, OrderItem.product) that break with per-service databases. Denormalize product and marketplace fields onto cart_items/order_items at write time. - Add AP internal inbox infrastructure (shared/infrastructure/internal_inbox*) for synchronous inter-service writes via HMAC-authenticated POST - Cart inbox blueprint handles Add/Remove/Update rose:CartItem activities - Market app sends AP activities to cart inbox instead of writing CartItem directly - Cart services use denormalized columns instead of cross-DB hydration/joins - Add marketplaces-by-ids data endpoint to market service - Alembic migration adds denormalized columns to cart_items and order_items - Add OAuth device flow auth to market scraper persist_api (artdag client pattern) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
85 lines
2.3 KiB
Python
85 lines
2.3 KiB
Python
"""Client for sending internal AP activities to other services.
|
|
|
|
Replaces ``call_action`` for AP-shaped inter-service writes.
|
|
Uses the same HMAC authentication as actions/data clients.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
|
|
import httpx
|
|
|
|
from shared.infrastructure.internal_auth import sign_internal_headers
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
_client: httpx.AsyncClient | None = None
|
|
|
|
_DEFAULT_TIMEOUT = 5.0
|
|
|
|
INBOX_HEADER = "X-Internal-Inbox"
|
|
|
|
|
|
class InboxError(Exception):
|
|
"""Raised when an internal inbox call fails."""
|
|
|
|
def __init__(self, message: str, status_code: int = 500, detail: dict | None = None):
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
self.detail = detail
|
|
|
|
|
|
def _get_client() -> httpx.AsyncClient:
|
|
global _client
|
|
if _client is None or _client.is_closed:
|
|
_client = httpx.AsyncClient(
|
|
timeout=httpx.Timeout(_DEFAULT_TIMEOUT),
|
|
follow_redirects=False,
|
|
)
|
|
return _client
|
|
|
|
|
|
def _internal_url(app_name: str) -> str:
|
|
env_key = f"INTERNAL_URL_{app_name.upper()}"
|
|
return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/")
|
|
|
|
|
|
async def send_internal_activity(
|
|
app_name: str,
|
|
activity: dict,
|
|
*,
|
|
timeout: float = _DEFAULT_TIMEOUT,
|
|
) -> dict:
|
|
"""POST an AP activity to the target service's /internal/inbox.
|
|
|
|
Returns the parsed JSON response on 2xx.
|
|
Raises ``InboxError`` on network errors or non-2xx responses.
|
|
"""
|
|
base = _internal_url(app_name)
|
|
url = f"{base}/internal/inbox"
|
|
try:
|
|
headers = {INBOX_HEADER: "1", **sign_internal_headers(app_name)}
|
|
resp = await _get_client().post(
|
|
url,
|
|
json=activity,
|
|
headers=headers,
|
|
timeout=timeout,
|
|
)
|
|
if 200 <= resp.status_code < 300:
|
|
return resp.json()
|
|
msg = f"Inbox {app_name} returned {resp.status_code}"
|
|
detail = None
|
|
try:
|
|
detail = resp.json()
|
|
except Exception:
|
|
pass
|
|
log.error(msg)
|
|
raise InboxError(msg, status_code=resp.status_code, detail=detail)
|
|
except InboxError:
|
|
raise
|
|
except Exception as exc:
|
|
msg = f"Inbox {app_name} failed: {exc}"
|
|
log.error(msg)
|
|
raise InboxError(msg) from exc
|