Remove cross-DB relationships (CartItem.product, CartItem.market_place, OrderItem.product) that break with per-service databases. Denormalize product and marketplace fields onto cart_items/order_items at write time. - Add AP internal inbox infrastructure (shared/infrastructure/internal_inbox*) for synchronous inter-service writes via HMAC-authenticated POST - Cart inbox blueprint handles Add/Remove/Update rose:CartItem activities - Market app sends AP activities to cart inbox instead of writing CartItem directly - Cart services use denormalized columns instead of cross-DB hydration/joins - Add marketplaces-by-ids data endpoint to market service - Alembic migration adds denormalized columns to cart_items and order_items - Add OAuth device flow auth to market scraper persist_api (artdag client pattern) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
50 lines
1.7 KiB
Python
50 lines
1.7 KiB
Python
"""Internal AP inbox dispatch for synchronous inter-service writes.
|
|
|
|
Each service can register handlers for (activity_type, object_type) pairs.
|
|
When an internal AP activity arrives, it is routed to the matching handler.
|
|
|
|
This mirrors the federated ``dispatch_inbox_activity()`` pattern but for
|
|
HMAC-authenticated internal service-to-service calls.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Callable, Awaitable
|
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Registry: (activity_type, object_type) → async handler(session, body) → dict
|
|
_internal_handlers: dict[tuple[str, str], Callable[[AsyncSession, dict], Awaitable[dict]]] = {}
|
|
|
|
|
|
def register_internal_handler(
|
|
activity_type: str,
|
|
object_type: str,
|
|
handler: Callable[[AsyncSession, dict], Awaitable[dict]],
|
|
) -> None:
|
|
"""Register a handler for an internal AP activity type + object type pair."""
|
|
key = (activity_type, object_type)
|
|
if key in _internal_handlers:
|
|
log.warning("Overwriting internal handler for %s", key)
|
|
_internal_handlers[key] = handler
|
|
|
|
|
|
async def dispatch_internal_activity(session: AsyncSession, body: dict) -> dict:
|
|
"""Route an internal AP activity to the correct handler.
|
|
|
|
Returns the handler result dict. Raises ValueError for unknown types.
|
|
"""
|
|
activity_type = body.get("type", "")
|
|
obj = body.get("object") or {}
|
|
object_type = obj.get("type", "") if isinstance(obj, dict) else ""
|
|
|
|
key = (activity_type, object_type)
|
|
handler = _internal_handlers.get(key)
|
|
if handler is None:
|
|
raise ValueError(f"No internal handler for {key}")
|
|
|
|
log.info("Dispatching internal activity %s", key)
|
|
return await handler(session, body)
|