Files
mono/shared/infrastructure/actions.py
giles 3b707ec8a0 Decouple all cross-app service calls to HTTP endpoints
Replace every direct cross-app services.* call with HTTP-based
communication: call_action() for writes, fetch_data() for reads.
Each app now registers only its own domain service.

Infrastructure:
- shared/infrastructure/actions.py — POST client for /internal/actions/
- shared/infrastructure/data_client.py — GET client for /internal/data/
- shared/contracts/dtos.py — dto_to_dict/dto_from_dict serialization

Action endpoints (writes):
- events: 8 handlers (ticket adjust, claim/confirm, toggle, adopt)
- market: 2 handlers (create/soft-delete marketplace)
- cart: 1 handler (adopt cart for user)

Data endpoints (reads):
- blog: 4 (post-by-slug/id, posts-by-ids, search-posts)
- events: 10 (pending entries/tickets, entries/tickets for page/order,
  entry-ids, associated-entries, calendars, visible-entries-for-period)
- market: 1 (marketplaces-for-container)
- cart: 1 (cart-summary)

Service registration cleanup:
- blog→blog+federation, events→calendar+federation,
  market→market+federation, cart→cart only,
  federation→federation only, account→nothing
- Stubs reduced to minimal StubFederationService

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-25 03:01:38 +00:00

90 lines
2.6 KiB
Python

"""Internal action client for cross-app write operations.
Each coop app exposes JSON action endpoints at ``/internal/actions/{name}``.
This module provides helpers to call those endpoints so that callers don't
need direct access to another app's DB session or service layer.
Failures raise ``ActionError`` so callers can handle or propagate them.
"""
from __future__ import annotations
import logging
import os
import httpx
log = logging.getLogger(__name__)
# Re-usable async client (created lazily, one per process)
_client: httpx.AsyncClient | None = None
# Default request timeout (seconds) — longer than fragments since these are writes
_DEFAULT_TIMEOUT = 5.0
# Header sent on every action request so providers can gate access.
ACTION_HEADER = "X-Internal-Action"
class ActionError(Exception):
"""Raised when an internal action call fails."""
def __init__(self, message: str, status_code: int = 500, detail: dict | None = None):
super().__init__(message)
self.status_code = status_code
self.detail = detail
def _get_client() -> httpx.AsyncClient:
global _client
if _client is None or _client.is_closed:
_client = httpx.AsyncClient(
timeout=httpx.Timeout(_DEFAULT_TIMEOUT),
follow_redirects=False,
)
return _client
def _internal_url(app_name: str) -> str:
"""Resolve the Docker-internal base URL for *app_name*."""
env_key = f"INTERNAL_URL_{app_name.upper()}"
return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/")
async def call_action(
app_name: str,
action_name: str,
*,
payload: dict | None = None,
timeout: float = _DEFAULT_TIMEOUT,
) -> dict:
"""POST JSON to ``{INTERNAL_URL_APP}/internal/actions/{action_name}``.
Returns the parsed JSON response on 2xx.
Raises ``ActionError`` on network errors or non-2xx responses.
"""
base = _internal_url(app_name)
url = f"{base}/internal/actions/{action_name}"
try:
resp = await _get_client().post(
url,
json=payload or {},
headers={ACTION_HEADER: "1"},
timeout=timeout,
)
if 200 <= resp.status_code < 300:
return resp.json()
msg = f"Action {app_name}/{action_name} returned {resp.status_code}"
detail = None
try:
detail = resp.json()
except Exception:
pass
log.error(msg)
raise ActionError(msg, status_code=resp.status_code, detail=detail)
except ActionError:
raise
except Exception as exc:
msg = f"Action {app_name}/{action_name} failed: {exc}"
log.error(msg)
raise ActionError(msg) from exc