Critical: Add ownership checks to all order routes (IDOR fix). High: Redis rate limiting on auth endpoints, HMAC-signed internal service calls replacing header-presence-only checks, nh3 HTML sanitization on ghost_sync and product import, internal auth on market API endpoints, SHA-256 hashed OAuth grant/code tokens. Medium: SECRET_KEY production guard, AP signature enforcement, is_admin param removal, cart_sid validation, SSRF protection on remote actor fetch. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
95 lines
2.7 KiB
Python
95 lines
2.7 KiB
Python
"""Internal data client for cross-app read operations.
|
|
|
|
Each coop app exposes JSON data endpoints at ``/internal/data/{query}``.
|
|
This module provides helpers to fetch that data so that callers don't
|
|
need direct access to another app's DB session or service layer.
|
|
|
|
Same pattern as the fragment client but returns parsed JSON instead of HTML.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
|
|
import httpx
|
|
|
|
from shared.infrastructure.internal_auth import sign_internal_headers
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
# Re-usable async client (created lazily, one per process)
|
|
_client: httpx.AsyncClient | None = None
|
|
|
|
# Default request timeout (seconds)
|
|
_DEFAULT_TIMEOUT = 3.0
|
|
|
|
# Header sent on every data request so providers can gate access.
|
|
DATA_HEADER = "X-Internal-Data"
|
|
|
|
|
|
class DataError(Exception):
|
|
"""Raised when an internal data fetch fails."""
|
|
|
|
def __init__(self, message: str, status_code: int = 500):
|
|
super().__init__(message)
|
|
self.status_code = status_code
|
|
|
|
|
|
def _get_client() -> httpx.AsyncClient:
|
|
global _client
|
|
if _client is None or _client.is_closed:
|
|
_client = httpx.AsyncClient(
|
|
timeout=httpx.Timeout(_DEFAULT_TIMEOUT),
|
|
follow_redirects=False,
|
|
)
|
|
return _client
|
|
|
|
|
|
def _internal_url(app_name: str) -> str:
|
|
"""Resolve the Docker-internal base URL for *app_name*."""
|
|
env_key = f"INTERNAL_URL_{app_name.upper()}"
|
|
return os.getenv(env_key, f"http://{app_name}:8000").rstrip("/")
|
|
|
|
|
|
async def fetch_data(
|
|
app_name: str,
|
|
query_name: str,
|
|
*,
|
|
params: dict | None = None,
|
|
timeout: float = _DEFAULT_TIMEOUT,
|
|
required: bool = True,
|
|
) -> dict | list | None:
|
|
"""GET JSON from ``{INTERNAL_URL_APP}/internal/data/{query_name}``.
|
|
|
|
Returns parsed JSON (dict or list) on success.
|
|
When *required* is True (default), raises ``DataError`` on failure.
|
|
When *required* is False, returns None on failure.
|
|
"""
|
|
base = _internal_url(app_name)
|
|
url = f"{base}/internal/data/{query_name}"
|
|
try:
|
|
headers = {DATA_HEADER: "1", **sign_internal_headers(app_name)}
|
|
resp = await _get_client().get(
|
|
url,
|
|
params=params,
|
|
headers=headers,
|
|
timeout=timeout,
|
|
)
|
|
if resp.status_code == 200:
|
|
return resp.json()
|
|
msg = f"Data {app_name}/{query_name} returned {resp.status_code}"
|
|
if required:
|
|
log.error(msg)
|
|
raise DataError(msg, status_code=resp.status_code)
|
|
log.warning(msg)
|
|
return None
|
|
except DataError:
|
|
raise
|
|
except Exception as exc:
|
|
msg = f"Data {app_name}/{query_name} failed: {exc}"
|
|
if required:
|
|
log.error(msg)
|
|
raise DataError(msg) from exc
|
|
log.warning(msg)
|
|
return None
|