Files
mono/shared/infrastructure/data_client.py
giles c015f3f02f Security audit: fix IDOR, add rate limiting, HMAC auth, token hashing, XSS sanitization
Critical: Add ownership checks to all order routes (IDOR fix).
High: Redis rate limiting on auth endpoints, HMAC-signed internal
service calls replacing header-presence-only checks, nh3 HTML
sanitization on ghost_sync and product import, internal auth on
market API endpoints, SHA-256 hashed OAuth grant/code tokens.
Medium: SECRET_KEY production guard, AP signature enforcement,
is_admin param removal, cart_sid validation, SSRF protection on
remote actor fetch.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 13:30:27 +00:00

95 lines
2.7 KiB
Python

"""Internal data client for cross-app read operations.
Each coop app exposes JSON data endpoints at ``/internal/data/{query}``.
This module provides helpers to fetch that data so that callers don't
need direct access to another app's DB session or service layer.
Same pattern as the fragment client but returns parsed JSON instead of HTML.
"""
from __future__ import annotations
import logging
import os
import httpx
from shared.infrastructure.internal_auth import sign_internal_headers
log = logging.getLogger(__name__)
# Re-usable async client (created lazily, one per process)
_client: httpx.AsyncClient | None = None
# Default request timeout (seconds)
_DEFAULT_TIMEOUT = 3.0
# Header sent on every data request so providers can gate access.
DATA_HEADER = "X-Internal-Data"
class DataError(Exception):
"""Raised when an internal data fetch fails."""
def __init__(self, message: str, status_code: int = 500):
super().__init__(message)
self.status_code = status_code
def _get_client() -> httpx.AsyncClient:
global _client
if _client is None or _client.is_closed:
_client = httpx.AsyncClient(
timeout=httpx.Timeout(_DEFAULT_TIMEOUT),
follow_redirects=False,
)
return _client
def _internal_url(app_name: str) -> str:
"""Resolve the Docker-internal base URL for *app_name*."""
env_key = f"INTERNAL_URL_{app_name.upper()}"
return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/")
async def fetch_data(
app_name: str,
query_name: str,
*,
params: dict | None = None,
timeout: float = _DEFAULT_TIMEOUT,
required: bool = True,
) -> dict | list | None:
"""GET JSON from ``{INTERNAL_URL_APP}/internal/data/{query_name}``.
Returns parsed JSON (dict or list) on success.
When *required* is True (default), raises ``DataError`` on failure.
When *required* is False, returns None on failure.
"""
base = _internal_url(app_name)
url = f"{base}/internal/data/{query_name}"
try:
headers = {DATA_HEADER: "1", **sign_internal_headers(app_name)}
resp = await _get_client().get(
url,
params=params,
headers=headers,
timeout=timeout,
)
if resp.status_code == 200:
return resp.json()
msg = f"Data {app_name}/{query_name} returned {resp.status_code}"
if required:
log.error(msg)
raise DataError(msg, status_code=resp.status_code)
log.warning(msg)
return None
except DataError:
raise
except Exception as exc:
msg = f"Data {app_name}/{query_name} failed: {exc}"
if required:
log.error(msg)
raise DataError(msg) from exc
log.warning(msg)
return None