"""Client for sending internal AP activities to other services. Replaces ``call_action`` for AP-shaped inter-service writes. Uses the same HMAC authentication as actions/data clients. """ from __future__ import annotations import logging import os import httpx from shared.infrastructure.internal_auth import sign_internal_headers log = logging.getLogger(__name__) _client: httpx.AsyncClient | None = None _DEFAULT_TIMEOUT = 5.0 INBOX_HEADER = "X-Internal-Inbox" class InboxError(Exception): """Raised when an internal inbox call fails.""" def __init__(self, message: str, status_code: int = 500, detail: dict | None = None): super().__init__(message) self.status_code = status_code self.detail = detail def _get_client() -> httpx.AsyncClient: global _client if _client is None or _client.is_closed: _client = httpx.AsyncClient( timeout=httpx.Timeout(_DEFAULT_TIMEOUT), follow_redirects=False, ) return _client def _internal_url(app_name: str) -> str: env_key = f"INTERNAL_URL_{app_name.upper()}" return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/") async def send_internal_activity( app_name: str, activity: dict, *, timeout: float = _DEFAULT_TIMEOUT, ) -> dict: """POST an AP activity to the target service's /internal/inbox. Returns the parsed JSON response on 2xx. Raises ``InboxError`` on network errors or non-2xx responses. """ base = _internal_url(app_name) url = f"{base}/internal/inbox" try: headers = {INBOX_HEADER: "1", **sign_internal_headers(app_name)} resp = await _get_client().post( url, json=activity, headers=headers, timeout=timeout, ) if 200 <= resp.status_code < 300: return resp.json() msg = f"Inbox {app_name} returned {resp.status_code}" detail = None try: detail = resp.json() except Exception: pass log.error(msg) raise InboxError(msg, status_code=resp.status_code, detail=detail) except InboxError: raise except Exception as exc: msg = f"Inbox {app_name} failed: {exc}" log.error(msg) raise InboxError(msg) from exc